aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2017-08-01 10:58:29 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2017-08-01 10:58:29 -0700
commitab04027e474da7212109e4afbd9896d3bb78719c (patch)
tree75a3f7a981bbd454f35089a0dde8346d01a7d77e /src
parentfb193a1e2fd0093103101b2551dd3e7b9af8833d (diff)
parentd885d24161e9ebf7c1f1d225569c949cd909fd06 (diff)
Merge branch 'master' of https://github.com/grpc/grpc into flow-control-v3
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c3
-rw-r--r--src/core/lib/iomgr/combiner.c4
-rw-r--r--src/core/lib/iomgr/nameser.h104
-rw-r--r--src/core/lib/iomgr/port.h5
-rw-r--r--src/core/lib/surface/alarm.c3
-rw-r--r--src/core/lib/surface/call.c23
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c68
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/server.c14
-rw-r--r--src/cpp/client/client_context.cc2
-rw-r--r--src/cpp/server/server_cc.cc50
-rw-r--r--src/node/src/server.js11
-rw-r--r--src/objective-c/GRPCClient/private/NSData+GRPC.m2
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m8
-rw-r--r--src/objective-c/tests/RxLibraryUnitTests.m70
-rwxr-xr-xsrc/objective-c/tests/build_one_example.sh6
-rwxr-xr-xsrc/objective-c/tests/run_tests.sh14
20 files changed, 312 insertions, 86 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index c3dca14305..b83c95275f 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state(
7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
index 9065e33613..6ec3790a5f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -33,13 +33,13 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
-#include <nameser.h>
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/nameser.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 29dfa885de..776e0765fe 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -968,6 +968,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
+ if (url) {
+ gpr_free(url);
+ }
unsigned int header_index;
for (header_index = 0; header_index < s->header_array.count;
header_index++) {
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index c72c37e2b5..9b66987b68 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -73,10 +73,8 @@ static const grpc_closure_scheduler_vtable finally_scheduler = {
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
grpc_combiner *grpc_combiner_create(void) {
- grpc_combiner *lock = gpr_malloc(sizeof(*lock));
+ grpc_combiner *lock = gpr_zalloc(sizeof(*lock));
gpr_ref_init(&lock->refs, 1);
- lock->next_combiner_on_this_exec_ctx = NULL;
- lock->time_to_execute_final_list = false;
lock->scheduler.vtable = &scheduler;
lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
diff --git a/src/core/lib/iomgr/nameser.h b/src/core/lib/iomgr/nameser.h
new file mode 100644
index 0000000000..daed6de518
--- /dev/null
+++ b/src/core/lib/iomgr/nameser.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_NAMESER_H
+#define GRPC_CORE_LIB_IOMGR_NAMESER_H
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_ARPA_NAMESER
+
+#include <arpa/nameser.h>
+
+#else /* GRPC_HAVE_ARPA_NAMESER */
+
+typedef enum __ns_class {
+ ns_c_invalid = 0, /* Cookie. */
+ ns_c_in = 1, /* Internet. */
+ ns_c_2 = 2, /* unallocated/unsupported. */
+ ns_c_chaos = 3, /* MIT Chaos-net. */
+ ns_c_hs = 4, /* MIT Hesiod. */
+ /* Query class values which do not appear in resource records */
+ ns_c_none = 254, /* for prereq. sections in update requests */
+ ns_c_any = 255, /* Wildcard match. */
+ ns_c_max = 65536
+} ns_class;
+
+typedef enum __ns_type {
+ ns_t_invalid = 0, /* Cookie. */
+ ns_t_a = 1, /* Host address. */
+ ns_t_ns = 2, /* Authoritative server. */
+ ns_t_md = 3, /* Mail destination. */
+ ns_t_mf = 4, /* Mail forwarder. */
+ ns_t_cname = 5, /* Canonical name. */
+ ns_t_soa = 6, /* Start of authority zone. */
+ ns_t_mb = 7, /* Mailbox domain name. */
+ ns_t_mg = 8, /* Mail group member. */
+ ns_t_mr = 9, /* Mail rename name. */
+ ns_t_null = 10, /* Null resource record. */
+ ns_t_wks = 11, /* Well known service. */
+ ns_t_ptr = 12, /* Domain name pointer. */
+ ns_t_hinfo = 13, /* Host information. */
+ ns_t_minfo = 14, /* Mailbox information. */
+ ns_t_mx = 15, /* Mail routing information. */
+ ns_t_txt = 16, /* Text strings. */
+ ns_t_rp = 17, /* Responsible person. */
+ ns_t_afsdb = 18, /* AFS cell database. */
+ ns_t_x25 = 19, /* X_25 calling address. */
+ ns_t_isdn = 20, /* ISDN calling address. */
+ ns_t_rt = 21, /* Router. */
+ ns_t_nsap = 22, /* NSAP address. */
+ ns_t_nsap_ptr = 23, /* Reverse NSAP lookup (deprecated). */
+ ns_t_sig = 24, /* Security signature. */
+ ns_t_key = 25, /* Security key. */
+ ns_t_px = 26, /* X.400 mail mapping. */
+ ns_t_gpos = 27, /* Geographical position (withdrawn). */
+ ns_t_aaaa = 28, /* Ip6 Address. */
+ ns_t_loc = 29, /* Location Information. */
+ ns_t_nxt = 30, /* Next domain (security). */
+ ns_t_eid = 31, /* Endpoint identifier. */
+ ns_t_nimloc = 32, /* Nimrod Locator. */
+ ns_t_srv = 33, /* Server Selection. */
+ ns_t_atma = 34, /* ATM Address */
+ ns_t_naptr = 35, /* Naming Authority PoinTeR */
+ ns_t_kx = 36, /* Key Exchange */
+ ns_t_cert = 37, /* Certification record */
+ ns_t_a6 = 38, /* IPv6 address (deprecates AAAA) */
+ ns_t_dname = 39, /* Non-terminal DNAME (for IPv6) */
+ ns_t_sink = 40, /* Kitchen sink (experimentatl) */
+ ns_t_opt = 41, /* EDNS0 option (meta-RR) */
+ ns_t_apl = 42, /* Address prefix list (RFC3123) */
+ ns_t_ds = 43, /* Delegation Signer (RFC4034) */
+ ns_t_sshfp = 44, /* SSH Key Fingerprint (RFC4255) */
+ ns_t_rrsig = 46, /* Resource Record Signature (RFC4034) */
+ ns_t_nsec = 47, /* Next Secure (RFC4034) */
+ ns_t_dnskey = 48, /* DNS Public Key (RFC4034) */
+ ns_t_tkey = 249, /* Transaction key */
+ ns_t_tsig = 250, /* Transaction signature. */
+ ns_t_ixfr = 251, /* Incremental zone transfer. */
+ ns_t_axfr = 252, /* Transfer zone of authority. */
+ ns_t_mailb = 253, /* Transfer mailbox records. */
+ ns_t_maila = 254, /* Transfer mail agent records. */
+ ns_t_any = 255, /* Wildcard match. */
+ ns_t_zxfr = 256, /* BIND-specific, nonstandard. */
+ ns_t_max = 65536
+} ns_type;
+
+#endif /* GRPC_HAVE_ARPA_NAMESER */
+
+#endif /* GRPC_CORE_LIB_IOMGR_NAMESER_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index f5d15b4850..c12058f890 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -24,6 +24,7 @@
#if defined(GRPC_UV)
// Do nothing
#elif defined(GPR_MANYLINUX1)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -51,6 +52,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -82,6 +84,7 @@
#define GRPC_POSIX_SOCKETUTILS
#endif
#elif defined(GPR_APPLE)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
@@ -93,6 +96,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
@@ -104,6 +108,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index ef8405cca8..55934964f3 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -18,6 +18,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq;
alarm->tag = tag;
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2365d27307..00ec9c7c9a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -644,6 +644,8 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
static grpc_error *error_from_status(grpc_status_code status,
const char *description) {
+ // copying 'description' is needed to ensure the grpc_call_cancel_with_status
+ // guarantee that can be short-lived.
return grpc_error_set_int(
grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
GRPC_ERROR_STR_GRPC_MESSAGE,
@@ -823,7 +825,7 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
return encodings_accepted_by_peer;
}
-static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
+static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) {
return (grpc_linked_mdelem *)&md->internal_data;
}
@@ -847,7 +849,7 @@ static int prepare_application_metadata(
for (i = 0; i < total_count; i++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, i, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
if (!GRPC_LOG_IF_ERROR("validate_metadata",
grpc_validate_header_key_is_legal(md->key))) {
@@ -864,7 +866,7 @@ static int prepare_application_metadata(
for (int j = 0; j < i; j++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, j, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GRPC_MDELEM_UNREF(exec_ctx, l->md);
}
return 0;
@@ -882,9 +884,12 @@ static int prepare_application_metadata(
}
for (i = 0; i < total_count; i++) {
grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
- GRPC_LOG_IF_ERROR(
- "prepare_application_metadata",
- grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md)));
+ grpc_linked_mdelem *l = linked_from_md(md);
+ grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ }
+ GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
}
call->send_extra_metadata_count = 0;
@@ -1422,7 +1427,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1723,7 +1728,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
@@ -1844,6 +1849,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
+ return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 80eb80af78..e85b308850 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3851993c92..3d82a32e82 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
- void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ bool (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
-}
-
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
-#ifndef NDEBUG
- gpr_mu_lock(cq->mu);
- if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
- cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags =
- gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
- cq->outstanding_tag_capacity);
- }
- cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cq->mu);
-#endif
- cq->vtable->begin_op(cq, tag);
-}
-
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
@@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ while (true) {
+ gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
+ if (count == 0) {
+ return false;
+ } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
+ break;
+ }
+ }
+ return true;
+}
+
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ GPR_ASSERT(!cqd->shutdown_called);
+ gpr_ref(&cqd->pending_events);
+ return true;
+}
+
+bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ return cq->vtable->begin_op(cq, tag);
+}
+
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index af44482513..69d144bd95 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
- \a tag is currently used only in debug builds. */
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+ \a tag is currently used only in debug builds. Return true on success, and
+ false if completion_queue has been shutdown. */
+bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index fce7f8dca1..66dcc299aa 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* stay locked, and gather up some stuff to do */
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
@@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 14cacc8f18..3af8bdc11a 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -24,6 +24,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc++/impl/grpc_library.h>
#include <grpc++/security/credentials.h>
#include <grpc++/server_context.h>
#include <grpc++/support/time.h>
@@ -38,6 +39,7 @@ class DefaultGlobalClientCallbacks final
void Destructor(ClientContext* context) override {}
};
+static internal::GrpcLibraryInitializer g_gli_initializer;
static DefaultGlobalClientCallbacks g_default_client_callbacks;
static ClientContext::GlobalCallbacks* g_client_callbacks =
&g_default_client_callbacks;
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 92bacbe061..2483300cb1 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -151,19 +151,25 @@ class Server::SyncRequest final : public CompletionQueueTag {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
if (tag_) {
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_registered_call(
- server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this));
+ if (GRPC_CALL_OK !=
+ grpc_server_request_registered_call(
+ server, tag_, &call_, &deadline_, &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq_,
+ notify_cq, this)) {
+ TeardownRequest();
+ return;
+ }
} else {
if (!call_details_) {
call_details_ = new grpc_call_details;
grpc_call_details_init(call_details_);
}
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
- server, &call_, call_details_,
- &request_metadata_, cq_, notify_cq, this));
+ if (grpc_server_request_call(server, &call_, call_details_,
+ &request_metadata_, cq_, notify_cq,
+ this) != GRPC_CALL_OK) {
+ TeardownRequest();
+ return;
+ }
}
}
@@ -286,12 +292,10 @@ class Server::SyncRequestThreadManager : public ThreadManager {
if (ok) {
// Calldata takes ownership of the completion queue inside sync_req
SyncRequest::CallData cd(server_, sync_req);
- {
- // Prepare for the next request
- if (!IsShutdown()) {
- sync_req->SetupRequest(); // Create new completion queue for sync_req
- sync_req->Request(server_->c_server(), server_cq_->cq());
- }
+ // Prepare for the next request
+ if (!IsShutdown()) {
+ sync_req->SetupRequest(); // Create new completion queue for sync_req
+ sync_req->Request(server_->c_server(), server_cq_->cq());
}
GPR_TIMER_SCOPE("cd.Run()", 0);
@@ -316,8 +320,8 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}
void Shutdown() override {
- server_cq_->Shutdown();
ThreadManager::Shutdown();
+ server_cq_->Shutdown();
}
void Wait() override {
@@ -652,10 +656,11 @@ ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) {
- grpc_server_request_registered_call(
- server_->server(), registered_method, &call_, &context_->deadline_,
- context_->client_metadata_.arr(), payload, call_cq_->cq(),
- notification_cq->cq(), this);
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call(
+ server_->server(), registered_method, &call_,
+ &context_->deadline_,
+ context_->client_metadata_.arr(), payload,
+ call_cq_->cq(), notification_cq->cq(), this));
}
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
@@ -667,9 +672,10 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
- grpc_server_request_call(server->server(), &call_, &call_details_,
- context->client_metadata_.arr(), call_cq->cq(),
- notification_cq->cq(), this);
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ server->server(), &call_, &call_details_,
+ context->client_metadata_.arr(), call_cq->cq(),
+ notification_cq->cq(), this));
}
bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
diff --git a/src/node/src/server.js b/src/node/src/server.js
index fba0aac938..8b7c0b6862 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -919,11 +919,6 @@ Server.prototype.addService = function(service, implementation) {
});
};
-var logAddProtoServiceDeprecationOnce = _.once(function() {
- common.log(constants.logVerbosity.INFO,
- 'Server#addProtoService is deprecated. Use addService instead');
-});
-
/**
* Add a proto service to the server, with a corresponding implementation
* @deprecated Use {@link grpc.Server#addService} instead
@@ -931,11 +926,11 @@ var logAddProtoServiceDeprecationOnce = _.once(function() {
* @param {Object<String, grpc.Server~handleCall>} implementation Map of method
* names to method implementation for the provided service.
*/
-Server.prototype.addProtoService = function(service, implementation) {
+Server.prototype.addProtoService = util.deprecate(function(service,
+ implementation) {
var options;
var protobuf_js_5_common = require('./protobuf_js_5_common');
var protobuf_js_6_common = require('./protobuf_js_6_common');
- logAddProtoServiceDeprecationOnce();
if (protobuf_js_5_common.isProbablyProtobufJs5(service)) {
options = _.defaults(service.grpc_options, common.defaultGrpcOptions);
this.addService(
@@ -950,7 +945,7 @@ Server.prototype.addProtoService = function(service, implementation) {
// We assume that this is a service attributes object
this.addService(service, implementation);
}
-};
+}, 'Server#addProtoService: Use Server#addService instead');
/**
* Binds the server to the given port, with SSL disabled if creds is an
diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.m b/src/objective-c/GRPCClient/private/NSData+GRPC.m
index 7ee76ad333..7c46594dd5 100644
--- a/src/objective-c/GRPCClient/private/NSData+GRPC.m
+++ b/src/objective-c/GRPCClient/private/NSData+GRPC.m
@@ -47,6 +47,8 @@ static void MallocAndCopyByteBufferToCharArray(grpc_byte_buffer *buffer,
grpc_slice_unref(slice);
*array = result;
*length = uncompressed_length;
+
+ grpc_byte_buffer_reader_destroy(&reader);
}
static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array,
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 99cb0ad971..577a5e9a42 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -110,4 +110,12 @@
self.state = GRXWriterStateFinished;
}
+- (void)dealloc {
+ GRXWriterState state = self.state;
+ if (state == GRXWriterStateNotStarted ||
+ state == GRXWriterStatePaused) {
+ dispatch_resume(_writeQueue);
+ }
+}
+
@end
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index fa3ded4c0c..3a5adbbf37 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -213,4 +213,74 @@
XCTAssertEqualObjects(handler.errorOrNil, nil);
}
+#define WRITE_ROUNDS (1000)
+- (void)testBufferedPipeResumeWhenDealloc {
+ id anyValue = @7;
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ }];
+
+ // Release after alloc;
+ GRXBufferedPipe *pipe = [GRXBufferedPipe pipe];
+ pipe = nil;
+
+ // Release after write but before start
+ pipe = [GRXBufferedPipe pipe];
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ pipe = nil;
+
+ // Release after start but not write
+ pipe = [GRXBufferedPipe pipe];
+ [pipe startWithWriteable:writeable];
+ pipe = nil;
+
+ // Release after start and write
+ pipe = [GRXBufferedPipe pipe];
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ [pipe startWithWriteable:writeable];
+ pipe = nil;
+
+ // Release after start, write and pause
+ pipe = [GRXBufferedPipe pipe];
+ [pipe startWithWriteable:writeable];
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ pipe.state = GRXWriterStatePaused;
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ pipe = nil;
+
+ // Release after start, write, pause and finish
+ pipe = [GRXBufferedPipe pipe];
+ [pipe startWithWriteable:writeable];
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ pipe.state = GRXWriterStatePaused;
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ [pipe finishWithError:nil];
+ pipe = nil;
+
+ // Release after start, write, pause, finish and resume
+ pipe = [GRXBufferedPipe pipe];
+ [pipe startWithWriteable:writeable];
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ pipe.state = GRXWriterStatePaused;
+ for (int i = 0; i < WRITE_ROUNDS; i++) {
+ [pipe writeValue:anyValue];
+ }
+ [pipe finishWithError:nil];
+ pipe.state = GRXWriterStateStarted;
+ pipe = nil;
+}
+
@end
diff --git a/src/objective-c/tests/build_one_example.sh b/src/objective-c/tests/build_one_example.sh
index 576276096e..298d8e7043 100755
--- a/src/objective-c/tests/build_one_example.sh
+++ b/src/objective-c/tests/build_one_example.sh
@@ -37,9 +37,11 @@ rm -f Podfile.lock
pod install
set -o pipefail
-XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail)'
+XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail|\bpassed\b)'
xcodebuild \
build \
-workspace *.xcworkspace \
-scheme $SCHEME \
- -destination name="iPhone 6" | xcpretty
+ -destination name="iPhone 6" \
+ | egrep "$XCODEBUILD_FILTER" \
+ | egrep -v "(GPBDictionary|GPBArray)" -
diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh
index 8fa9439284..d9c78ebac9 100755
--- a/src/objective-c/tests/run_tests.sh
+++ b/src/objective-c/tests/run_tests.sh
@@ -70,7 +70,7 @@ trap 'kill -9 `jobs -p` ; echo "EXIT TIME: $(date)"' EXIT
# element of the pipe fails.
# TODO(jcanizales): Use xctool instead? Issue #2540.
set -o pipefail
-XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail)'
+XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail|\bpassed\b)'
echo "TIME: $(date)"
xcodebuild \
-workspace Tests.xcworkspace \
@@ -79,14 +79,18 @@ xcodebuild \
HOST_PORT_LOCALSSL=localhost:5051 \
HOST_PORT_LOCAL=localhost:5050 \
HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \
- test | xcpretty
+ test \
+ | egrep "$XCODEBUILD_FILTER" \
+ | egrep -v "(GPBDictionary|GPBArray)" -
echo "TIME: $(date)"
xcodebuild \
-workspace Tests.xcworkspace \
-scheme CoreCronetEnd2EndTests \
-destination name="iPhone 6" \
- test | xcpretty
+ test \
+ | egrep "$XCODEBUILD_FILTER" \
+ | egrep -v "(GPBDictionary|GPBArray)" -
# Temporarily disabled for (possible) flakiness on Jenkins.
# Fix or reenable after confirmation/disconfirmation that it is the source of
@@ -105,4 +109,6 @@ xcodebuild \
-scheme InteropTestsRemoteWithCronet \
-destination name="iPhone 6" \
HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \
- test | xcpretty
+ test \
+ | egrep "$XCODEBUILD_FILTER" \
+ | egrep -v "(GPBDictionary|GPBArray)" -