aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/client_channel.c2
-rw-r--r--src/core/client_config/lb_policies/pick_first.c47
-rw-r--r--src/core/iomgr/closure.c24
-rw-r--r--src/core/iomgr/closure.h3
-rw-r--r--src/core/iomgr/fd_posix.c220
-rw-r--r--src/core/iomgr/fd_posix.h19
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c2
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c34
-rw-r--r--src/core/iomgr/pollset_posix.c145
-rw-r--r--src/core/iomgr/pollset_posix.h11
-rw-r--r--src/core/iomgr/tcp_server_windows.c2
-rw-r--r--src/core/iomgr/udp_server.c2
-rw-r--r--src/core/iomgr/udp_server.h3
-rw-r--r--src/core/surface/call.c20
-rw-r--r--src/core/transport/chttp2/bin_encoder.c16
-rw-r--r--src/core/tsi/fake_transport_security.c8
-rw-r--r--src/node/.istanbul.yml6
-rw-r--r--src/node/LICENSE28
-rw-r--r--src/node/README.md36
-rw-r--r--src/node/binding.gyp100
-rw-r--r--src/node/ext/call.cc5
-rw-r--r--src/node/ext/node_grpc.cc2
-rw-r--r--src/node/index.js3
-rw-r--r--src/node/interop/empty.proto43
-rw-r--r--src/node/interop/interop_client.js77
-rw-r--r--src/node/interop/interop_server.js52
-rw-r--r--src/node/interop/messages.proto132
-rw-r--r--src/node/interop/test.proto72
-rw-r--r--src/node/package.json58
-rw-r--r--src/node/src/client.js4
-rw-r--r--src/node/src/metadata.js5
-rw-r--r--src/node/src/server.js2
-rw-r--r--src/node/test/call_test.js2
-rw-r--r--src/node/test/channel_test.js2
-rw-r--r--src/node/test/constant_test.js2
-rw-r--r--src/node/test/end_to_end_test.js2
-rw-r--r--src/node/test/interop_sanity_test.js4
-rw-r--r--src/node/test/server_test.js2
-rw-r--r--src/node/test/surface_test.js2
39 files changed, 507 insertions, 692 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index b59b62a6aa..8e7cb27cfd 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -51,7 +51,7 @@
typedef struct call_data call_data;
-typedef struct {
+typedef struct client_channel_channel_data {
/** metadata context for this channel */
grpc_mdctx *mdctx;
/** resolver for this channel */
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 28155d0fbc..4b3aaab09c 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -101,6 +101,9 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
+ if (p->selected) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
@@ -172,6 +175,35 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
+static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ pick_first_lb_policy *p = arg;
+ size_t i;
+ grpc_transport_op op;
+ size_t num_subchannels = p->num_subchannels;
+ grpc_subchannel **subchannels;
+ grpc_subchannel *exclude_subchannel;
+
+ gpr_mu_lock(&p->mu);
+ subchannels = p->subchannels;
+ p->num_subchannels = 0;
+ p->subchannels = NULL;
+ exclude_subchannel = p->selected;
+ gpr_mu_unlock(&p->mu);
+ GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
+
+ for (i = 0; i < num_subchannels; i++) {
+ if (subchannels[i] != exclude_subchannel) {
+ memset(&op, 0, sizeof(op));
+ op.disconnect = 1;
+ grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
+ }
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
+ }
+
+ gpr_free(subchannels);
+}
+
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
@@ -200,6 +232,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
p->selected = p->subchannels[p->checking_subchannel];
+ GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
+ /* drop the pick list: we are connected now */
+ GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1);
+ /* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
@@ -279,10 +316,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
size_t i;
size_t n;
grpc_subchannel **subchannels;
+ grpc_subchannel *selected;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
+ selected = p->selected;
+ if (selected) {
+ GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
+ }
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
@@ -290,9 +332,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
+ if (selected == subchannels[i]) continue;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
+ if (p->selected) {
+ grpc_subchannel_process_transport_op(exec_ctx, selected, op);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
+ }
gpr_free(subchannels);
}
diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c
index 3265425789..d91681990f 100644
--- a/src/core/iomgr/closure.c
+++ b/src/core/iomgr/closure.c
@@ -33,6 +33,8 @@
#include "src/core/iomgr/closure.h"
+#include <grpc/support/alloc.h>
+
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
@@ -69,3 +71,25 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
}
src->head = src->tail = NULL;
}
+
+typedef struct {
+ grpc_iomgr_cb_func cb;
+ void *cb_arg;
+ grpc_closure wrapper;
+} wrapped_closure;
+
+static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ wrapped_closure *wc = arg;
+ grpc_iomgr_cb_func cb = wc->cb;
+ void *cb_arg = wc->cb_arg;
+ gpr_free(wc);
+ cb(exec_ctx, cb_arg, success);
+}
+
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
+ wrapped_closure *wc = gpr_malloc(sizeof(*wc));
+ wc->cb = cb;
+ wc->cb_arg = cb_arg;
+ grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
+ return &wc->wrapper;
+}
diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h
index 982ffa4e1b..d812659af0 100644
--- a/src/core/iomgr/closure.h
+++ b/src/core/iomgr/closure.h
@@ -77,6 +77,9 @@ struct grpc_closure {
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg);
+/* Create a heap allocated closure: try to avoid except for very rare events */
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
+
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b48b7f050a..231bc988a8 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -45,10 +45,8 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
-enum descriptor_state {
- NOT_READY = 0,
- READY = 1
-}; /* or a pointer to a closure to call */
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
/* We need to keep a freelist not because of any concerns of malloc performance
* but instead so that implementations with multiple threads in (for example)
@@ -88,14 +86,13 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_unlock(&fd_freelist_mu);
if (r == NULL) {
r = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&r->set_state_mu);
- gpr_mu_init(&r->watcher_mu);
+ gpr_mu_init(&r->mu);
}
gpr_atm_rel_store(&r->refst, 1);
- gpr_atm_rel_store(&r->readst, NOT_READY);
- gpr_atm_rel_store(&r->writest, NOT_READY);
- gpr_atm_rel_store(&r->shutdown, 0);
+ r->shutdown = 0;
+ r->read_closure = CLOSURE_NOT_READY;
+ r->write_closure = CLOSURE_NOT_READY;
r->fd = fd;
r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
&r->inactive_watcher_root;
@@ -107,8 +104,7 @@ static grpc_fd *alloc_fd(int fd) {
}
static void destroy(grpc_fd *fd) {
- gpr_mu_destroy(&fd->set_state_mu);
- gpr_mu_destroy(&fd->watcher_mu);
+ gpr_mu_destroy(&fd->mu);
gpr_free(fd);
}
@@ -173,39 +169,35 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void pollset_kick_locked(grpc_pollset *pollset) {
- gpr_mu_lock(GRPC_POLLSET_MU(pollset));
- grpc_pollset_kick(pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
+ GPR_ASSERT(watcher->worker);
+ grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
+ pollset_kick_locked(fd->inactive_watcher_root.next);
} else if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher);
} else if (fd->write_watcher) {
- pollset_kick_locked(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher);
}
}
-static void maybe_wake_one_watcher(grpc_fd *fd) {
- gpr_mu_lock(&fd->watcher_mu);
- maybe_wake_one_watcher_locked(fd);
- gpr_mu_unlock(&fd->watcher_mu);
-}
-
static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- pollset_kick_locked(watcher->pollset);
+ pollset_kick_locked(watcher);
}
if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- pollset_kick_locked(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher);
}
}
@@ -218,7 +210,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
- gpr_mu_lock(&fd->watcher_mu);
+ gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
fd->closed = 1;
@@ -227,7 +219,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
} else {
wake_all_watchers_locked(fd);
}
- gpr_mu_unlock(&fd->watcher_mu);
+ gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
@@ -247,136 +239,121 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
-static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st,
- grpc_closure *closure) {
- switch (gpr_atm_acq_load(st)) {
- case NOT_READY:
- /* There is no race if the descriptor is already ready, so we skip
- the interlocked op in that case. As long as the app doesn't
- try to set the same upcall twice (which it shouldn't) then
- oldval should never be anything other than READY or NOT_READY. We
- don't
- check for user error on the fast path. */
- if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
- /* swap was successful -- the closure will run after the next
- set_ready call. NOTE: we don't have an ABA problem here,
- since we should never have concurrent calls to the same
- notify_on function. */
- maybe_wake_one_watcher(fd);
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the READY code below */
- case READY:
- GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
- gpr_atm_rel_store(st, NOT_READY);
- grpc_exec_ctx_enqueue(exec_ctx, closure,
- !gpr_atm_acq_load(&fd->shutdown));
- return;
- default: /* WAITING */
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st, grpc_closure *closure) {
+ if (*st == CLOSURE_NOT_READY) {
+ /* not ready ==> switch to a waiting state by setting the closure */
+ *st = closure;
+ } else if (*st == CLOSURE_READY) {
+ /* already ready ==> queue the closure to run immediately */
+ *st = CLOSURE_NOT_READY;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown);
+ maybe_wake_one_watcher_locked(fd);
+ } else {
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
}
- gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
- abort();
}
-static void set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- gpr_atm *st) {
- gpr_intptr state = gpr_atm_acq_load(st);
-
- switch (state) {
- case READY:
- /* duplicate ready, ignore */
- return;
- case NOT_READY:
- if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
- /* swap was successful -- the closure will run after the next
- notify_on call. */
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the WAITING code below */
- state = gpr_atm_acq_load(st);
- default: /* waiting */
- GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
- gpr_atm_no_barrier_load(st) != NOT_READY);
- grpc_exec_ctx_enqueue(exec_ctx, (grpc_closure *)state,
- !gpr_atm_acq_load(&fd->shutdown));
- gpr_atm_rel_store(st, NOT_READY);
- return;
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st) {
+ if (*st == CLOSURE_READY) {
+ /* duplicate ready ==> ignore */
+ return 0;
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready, and not waiting ==> flag ready */
+ *st = CLOSURE_READY;
+ return 0;
+ } else {
+ /* waiting ==> queue closure */
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown);
+ *st = CLOSURE_NOT_READY;
+ return 1;
}
}
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st) {
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
- gpr_mu_lock(&fd->set_state_mu);
+ gpr_mu_lock(&fd->mu);
set_ready_locked(exec_ctx, fd, st);
- gpr_mu_unlock(&fd->set_state_mu);
+ gpr_mu_unlock(&fd->mu);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- gpr_mu_lock(&fd->set_state_mu);
+ gpr_mu_lock(&fd->mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
- gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(exec_ctx, fd, &fd->readst);
- set_ready_locked(exec_ctx, fd, &fd->writest);
- gpr_mu_unlock(&fd->set_state_mu);
+ fd->shutdown = 1;
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
}
void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- notify_on(exec_ctx, fd, &fd->readst, closure);
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+ gpr_mu_unlock(&fd->mu);
}
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- notify_on(exec_ctx, fd, &fd->writest, closure);
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+ gpr_mu_unlock(&fd->mu);
}
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask,
- grpc_fd_watcher *watcher) {
+ grpc_pollset_worker *worker, gpr_uint32 read_mask,
+ gpr_uint32 write_mask, grpc_fd_watcher *watcher) {
gpr_uint32 mask = 0;
+ grpc_closure *cur;
+ int requested;
/* keep track of pollers that have requested our events, in case they change
*/
GRPC_FD_REF(fd, "poll");
- gpr_mu_lock(&fd->watcher_mu);
+ gpr_mu_lock(&fd->mu);
+
/* if we are shutdown, then don't add to the watcher set */
if (gpr_atm_no_barrier_load(&fd->shutdown)) {
watcher->fd = NULL;
watcher->pollset = NULL;
- gpr_mu_unlock(&fd->watcher_mu);
+ watcher->worker = NULL;
+ gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
return 0;
}
+
/* if there is nobody polling for read, but we need to, then start doing so */
- if (read_mask && !fd->read_watcher &&
- (gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) {
+ cur = fd->read_closure;
+ requested = cur != CLOSURE_READY;
+ if (read_mask && fd->read_watcher == NULL && requested) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
- if (write_mask && !fd->write_watcher &&
- (gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) {
+ cur = fd->write_closure;
+ requested = cur != CLOSURE_READY;
+ if (write_mask && fd->write_watcher == NULL && requested) {
fd->write_watcher = watcher;
mask |= write_mask;
}
/* if not polling, remember this watcher in case we need someone to later */
- if (mask == 0) {
+ if (mask == 0 && worker != NULL) {
watcher->next = &fd->inactive_watcher_root;
watcher->prev = watcher->next->prev;
watcher->next->prev = watcher->prev->next = watcher;
}
watcher->pollset = pollset;
+ watcher->worker = worker;
watcher->fd = fd;
- gpr_mu_unlock(&fd->watcher_mu);
+ gpr_mu_unlock(&fd->mu);
return mask;
}
@@ -391,24 +368,39 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
return;
}
- gpr_mu_lock(&fd->watcher_mu);
+ gpr_mu_lock(&fd->mu);
+
if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */
was_polling = 1;
- kick = kick || !got_read;
+ if (!got_read) {
+ kick = 1;
+ }
fd->read_watcher = NULL;
}
if (watcher == fd->write_watcher) {
/* remove write watcher, kick if we still need a write */
was_polling = 1;
- kick = kick || !got_write;
+ if (!got_write) {
+ kick = 1;
+ }
fd->write_watcher = NULL;
}
- if (!was_polling) {
+ if (!was_polling && watcher->worker != NULL) {
/* remove from inactive list */
watcher->next->prev = watcher->prev;
watcher->prev->next = watcher->next;
}
+ if (got_read) {
+ if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+ kick = 1;
+ }
+ }
+ if (got_write) {
+ if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+ kick = 1;
+ }
+ }
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
@@ -417,17 +409,17 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
close(fd->fd);
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
}
- gpr_mu_unlock(&fd->watcher_mu);
+ gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
}
void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->readst);
+ set_ready(exec_ctx, fd, &fd->read_closure);
}
void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->writest);
+ set_ready(exec_ctx, fd, &fd->write_closure);
}
#endif
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 089aa4d717..dc917ebbc0 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -46,6 +46,7 @@ typedef struct grpc_fd_watcher {
struct grpc_fd_watcher *next;
struct grpc_fd_watcher *prev;
grpc_pollset *pollset;
+ grpc_pollset_worker *worker;
grpc_fd *fd;
} grpc_fd_watcher;
@@ -58,8 +59,8 @@ struct grpc_fd {
and just unref by 1 when we're ready to flag the object as orphaned */
gpr_atm refst;
- gpr_mu set_state_mu;
- gpr_atm shutdown;
+ gpr_mu mu;
+ int shutdown;
int closed;
/* The watcher list.
@@ -84,18 +85,16 @@ struct grpc_fd {
If at a later time there becomes need of a poller to poll, one of
the inactive pollers may be kicked out of their poll loops to take
that responsibility. */
- gpr_mu watcher_mu;
grpc_fd_watcher inactive_watcher_root;
grpc_fd_watcher *read_watcher;
grpc_fd_watcher *write_watcher;
- gpr_atm readst;
- gpr_atm writest;
+ grpc_closure *read_closure;
+ grpc_closure *write_closure;
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
- grpc_closure *shutdown_closures[2];
grpc_iomgr_object iomgr_object;
};
@@ -126,10 +125,12 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
fd's current interest (such as epoll) do not need to call this function.
MUST NOT be called with a pollset lock taken */
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask,
- grpc_fd_watcher *rec);
+ grpc_pollset_worker *worker, gpr_uint32 read_mask,
+ gpr_uint32 write_mask, grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll
- MUST NOT be called with a pollset lock taken */
+ MUST NOT be called with a pollset lock taken
+ if got_read or got_write are 1, also does the become_{readable,writable} as
+ appropriate. */
void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
int got_read, int got_write);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index faf0a6362b..ba9ba73f9d 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -72,7 +72,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
- GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
+ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
if (watcher.fd != NULL) {
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
ev.data.ptr = fd;
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 1356ebe7a0..faa6c14491 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -102,6 +102,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
static void multipoll_with_poll_pollset_maybe_work_and_unlock(
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now) {
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
int timeout;
int r;
size_t i, j, fd_count;
@@ -147,8 +150,8 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
gpr_mu_unlock(&pollset->mu);
for (i = 2; i < pfd_count; i++) {
- pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
- POLLOUT, &watchers[i]);
+ pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker,
+ POLLIN, POLLOUT, &watchers[i]);
}
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
@@ -157,34 +160,29 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
- for (i = 2; i < pfd_count; i++) {
- grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
- pfds[i].revents & POLLOUT);
- }
-
if (r < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ for (i = 2; i < pfd_count; i++) {
+ grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
}
} else if (r == 0) {
- /* do nothing */
+ for (i = 2; i < pfd_count; i++) {
+ grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ }
} else {
- if (pfds[0].revents & POLLIN) {
+ if (pfds[0].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
- if (pfds[1].revents & POLLIN) {
+ if (pfds[1].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
+ grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
continue;
}
- if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(exec_ctx, watchers[i].fd);
- }
- if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(exec_ctx, watchers[i].fd);
- }
+ grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+ pfds[i].revents & POLLOUT_CHECK);
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index b663780a02..d056866d13 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -98,31 +98,63 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
-void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+void grpc_pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ gpr_uint32 flags) {
/* pollset->mu already held */
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
}
p->kicked_without_pollers = 1;
+ return;
} else if (gpr_tls_get(&g_current_thread_worker) !=
(gpr_intptr)specific_worker) {
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ return;
+ } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ return;
}
} else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
+ if (gpr_tls_get(&g_current_thread_worker) ==
+ (gpr_intptr)specific_worker) {
+ push_back_worker(p, specific_worker);
+ specific_worker = pop_front_worker(p);
+ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+ gpr_tls_get(&g_current_thread_worker) ==
+ (gpr_intptr)specific_worker) {
+ push_back_worker(p, specific_worker);
+ return;
+ }
+ }
push_back_worker(p, specific_worker);
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ return;
} else {
p->kicked_without_pollers = 1;
+ return;
}
}
}
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ grpc_pollset_kick_ext(p, specific_worker, 0);
+}
+
/* global state management */
void grpc_pollset_global_init(void) {
@@ -195,52 +227,88 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* pollset->mu already held */
int added_worker = 0;
int locked = 1;
+ int queued_work = 0;
+ int keep_polling = 0;
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
+ worker->reevaluate_polling_on_wakeup = 0;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
+ /* If there's work waiting for the pollset to be idle, and the
+ pollset is idle, then do that work */
if (!grpc_pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
goto done;
}
+ /* Check alarms - these are a global resource so we just ping
+ each time through on every pollset.
+ May update deadline to ensure timely wakeups.
+ TODO(ctiller): can this work be localized? */
if (grpc_alarm_check(exec_ctx, now, &deadline)) {
gpr_mu_unlock(&pollset->mu);
locked = 0;
goto done;
}
+ /* If we're shutting down then we don't execute any extended work */
if (pollset->shutting_down) {
goto done;
}
+ /* Give do_promote priority so we don't starve it out */
if (pollset->in_flight_cbs) {
- /* Give do_promote priority so we don't starve it out */
gpr_mu_unlock(&pollset->mu);
locked = 0;
goto done;
}
- if (!pollset->kicked_without_pollers) {
- push_front_worker(pollset, worker);
- added_worker = 1;
- gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
- pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline,
- now);
- locked = 0;
- gpr_tls_set(&g_current_thread_poller, 0);
- gpr_tls_set(&g_current_thread_worker, 0);
- } else {
- pollset->kicked_without_pollers = 0;
- }
-done:
- if (!locked) {
- grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
- locked = 1;
+ /* Start polling, and keep doing so while we're being asked to
+ re-evaluate our pollers (this allows poll() based pollers to
+ ensure they don't miss wakeups) */
+ keep_polling = 1;
+ while (keep_polling) {
+ keep_polling = 0;
+ if (!pollset->kicked_without_pollers) {
+ if (!added_worker) {
+ push_front_worker(pollset, worker);
+ added_worker = 1;
+ }
+ gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
+ gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
+ pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
+ deadline, now);
+ locked = 0;
+ gpr_tls_set(&g_current_thread_poller, 0);
+ gpr_tls_set(&g_current_thread_worker, 0);
+ } else {
+ pollset->kicked_without_pollers = 0;
+ }
+ /* Finished execution - start cleaning up.
+ Note that we may arrive here from outside the enclosing while() loop.
+ In that case we won't loop though as we haven't added worker to the
+ worker list, which means nobody could ask us to re-evaluate polling). */
+ done:
+ if (!locked) {
+ queued_work |= grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
+ /* If we're forced to re-evaluate polling (via grpc_pollset_kick with
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
+ a loop */
+ if (worker->reevaluate_polling_on_wakeup) {
+ worker->reevaluate_polling_on_wakeup = 0;
+ pollset->kicked_without_pollers = 0;
+ if (queued_work) {
+ /* If there's queued work on the list, then set the deadline to be
+ immediate so we get back out of the polling loop quickly */
+ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ }
+ keep_polling = 1;
+ }
}
- grpc_wakeup_fd_destroy(&worker->wakeup_fd);
if (added_worker) {
remove_worker(pollset, worker);
}
+ grpc_wakeup_fd_destroy(&worker->wakeup_fd);
if (pollset->shutting_down) {
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, NULL);
@@ -454,6 +522,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset_worker *worker,
gpr_timespec deadline,
gpr_timespec now) {
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
struct pollfd pfd[3];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
@@ -479,8 +550,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
pfd[2].revents = 0;
GRPC_FD_REF(fd, "basicpoll_begin");
gpr_mu_unlock(&pollset->mu);
- pfd[2].events =
- (short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
+ pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
+ POLLOUT, &fd_watcher);
if (pfd[2].events != 0) {
nfds++;
}
@@ -497,31 +568,27 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
- if (fd) {
- grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,
- pfd[2].revents & POLLOUT);
- }
-
if (r < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ if (fd) {
+ grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
}
} else if (r == 0) {
- /* do nothing */
+ if (fd) {
+ grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ }
} else {
- if (pfd[0].revents & POLLIN) {
+ if (pfd[0].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
- if (pfd[1].revents & POLLIN) {
+ if (pfd[1].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (nfds > 2) {
- if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(exec_ctx, fd);
- }
- if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(exec_ctx, fd);
- }
+ grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
+ pfd[2].revents & POLLOUT_CHECK);
+ } else if (fd) {
+ grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
}
}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 83c5258539..34f76db2af 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -50,6 +50,7 @@ struct grpc_fd;
typedef struct grpc_pollset_worker {
grpc_wakeup_fd wakeup_fd;
+ int reevaluate_polling_on_wakeup;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
} grpc_pollset_worker;
@@ -111,6 +112,16 @@ void grpc_kick_drain(grpc_pollset *p);
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now);
+/* Allow kick to wakeup the currently polling worker */
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+/* Force the wakee to repoll when awoken */
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+/* As per grpc_pollset_kick, with an extended set of flags (defined above)
+ -- mostly for fd_posix's use. */
+void grpc_pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ gpr_uint32 flags);
+
/* turn a pollset into a multipoller: platform specific */
typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index db3319b3c6..3fea8b5b35 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -336,6 +336,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
+ } else {
+ closesocket(sock);
}
}
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 1304f2067e..9903e970e6 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -278,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(sp->emfd, sp->server->grpc_server);
+ sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server);
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index dbbe097109..de5736c426 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -43,7 +43,8 @@ typedef struct grpc_server grpc_server;
typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(grpc_fd *emfd, grpc_server *server);
+typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
+ grpc_server *server);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index d15a3bcbad..07c3ff6ae4 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -425,7 +425,12 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) {
if (call->allocated_completions & (1u << i)) {
continue;
}
- call->allocated_completions |= (gpr_uint8)(1u << i);
+ /* NB: the following integer arithmetic operation needs to be in its
+ * expanded form due to the "integral promotion" performed (see section
+ * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
+ * is then required to avoid the compiler warning */
+ call->allocated_completions =
+ (gpr_uint8)(call->allocated_completions | (1u << i));
gpr_mu_unlock(&call->completion_mu);
return &call->completions[i];
}
@@ -736,7 +741,11 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
size_t i;
/* ioreq is live: we need to do something */
master = &call->masters[master_set];
- master->complete_mask |= (gpr_uint16)(1u << op);
+ /* NB: the following integer arithmetic operation needs to be in its
+ * expanded form due to the "integral promotion" performed (see section
+ * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
+ * is then required to avoid the compiler warning */
+ master->complete_mask = (gpr_uint16)(master->complete_mask | (1u << op));
if (!success) {
master->success = 0;
}
@@ -927,6 +936,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
}
/* we have to be reading a message to know what to do here */
if (!call->reading_message) {
+ gpr_slice_unref(slice);
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
"Received payload data while not reading a message");
return 0;
@@ -1246,7 +1256,11 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
GRPC_MDSTR_REF(reqs[i].data.send_status.details));
}
}
- have_ops |= (gpr_uint16)(1u << op);
+ /* NB: the following integer arithmetic operation needs to be in its
+ * expanded form due to the "integral promotion" performed (see section
+ * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
+ * is then required to avoid the compiler warning */
+ have_ops = (gpr_uint16)(have_ops | (1u << op));
call->request_data[op] = data;
call->request_flags[op] = reqs[i].flags;
diff --git a/src/core/transport/chttp2/bin_encoder.c b/src/core/transport/chttp2/bin_encoder.c
index f1bbf9aa91..9c9070ede4 100644
--- a/src/core/transport/chttp2/bin_encoder.c
+++ b/src/core/transport/chttp2/bin_encoder.c
@@ -185,8 +185,12 @@ gpr_slice grpc_chttp2_huffman_compress(gpr_slice input) {
}
if (temp_length) {
- *out++ = (gpr_uint8)(temp << (8u - temp_length)) |
- (gpr_uint8)(0xffu >> temp_length);
+ /* NB: the following integer arithmetic operation needs to be in its
+ * expanded form due to the "integral promotion" performed (see section
+ * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
+ * is then required to avoid the compiler warning */
+ *out++ = (gpr_uint8)((gpr_uint8)(temp << (8u - temp_length)) |
+ (gpr_uint8)(0xffu >> temp_length));
}
GPR_ASSERT(out == GPR_SLICE_END_PTR(output));
@@ -265,8 +269,12 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input) {
}
if (out.temp_length) {
- *out.out++ = (gpr_uint8)(out.temp << (8u - out.temp_length)) |
- (gpr_uint8)(0xffu >> out.temp_length);
+ /* NB: the following integer arithmetic operation needs to be in its
+ * expanded form due to the "integral promotion" performed (see section
+ * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
+ * is then required to avoid the compiler warning */
+ *out.out++ = (gpr_uint8)((gpr_uint8)(out.temp << (8u - out.temp_length)) |
+ (gpr_uint8)(0xffu >> out.temp_length));
}
GPR_ASSERT(out.out <= GPR_SLICE_END_PTR(output));
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index cbb6f17ae1..a40268a7f0 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -118,10 +118,10 @@ static gpr_uint32 load32_little_endian(const unsigned char *buf) {
}
static void store32_little_endian(gpr_uint32 value, unsigned char *buf) {
- buf[3] = (unsigned char)(value >> 24) & 0xFF;
- buf[2] = (unsigned char)(value >> 16) & 0xFF;
- buf[1] = (unsigned char)(value >> 8) & 0xFF;
- buf[0] = (unsigned char)(value)&0xFF;
+ buf[3] = (unsigned char)((value >> 24) & 0xFF);
+ buf[2] = (unsigned char)((value >> 16) & 0xFF);
+ buf[1] = (unsigned char)((value >> 8) & 0xFF);
+ buf[0] = (unsigned char)((value) & 0xFF);
}
static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) {
diff --git a/src/node/.istanbul.yml b/src/node/.istanbul.yml
deleted file mode 100644
index 9ff1379f51..0000000000
--- a/src/node/.istanbul.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-reporting:
- watermarks:
- statements: [80, 95]
- lines: [80, 95]
- functions: [80, 95]
- branches: [80, 95]
diff --git a/src/node/LICENSE b/src/node/LICENSE
deleted file mode 100644
index 0209b570e1..0000000000
--- a/src/node/LICENSE
+++ /dev/null
@@ -1,28 +0,0 @@
-Copyright 2015, Google Inc.
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
- * Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
- * Neither the name of Google Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/node/README.md b/src/node/README.md
index 7719d08290..5d89e2228d 100644
--- a/src/node/README.md
+++ b/src/node/README.md
@@ -5,51 +5,19 @@ Beta
## PREREQUISITES
- `node`: This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package.
-- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
## INSTALLATION
-**Linux (Debian):**
-
-Add [Debian jessie-backports][] to your `sources.list` file. Example:
-
-```sh
-echo "deb http://http.debian.net/debian jessie-backports main" | \
-sudo tee -a /etc/apt/sources.list
-```
-
-Install the gRPC Debian package
-
-```sh
-sudo apt-get update
-sudo apt-get install libgrpc-dev
-```
-
Install the gRPC NPM package
```sh
npm install grpc
```
-**Mac OS X**
-
-Install [homebrew][]. Run the following command to install gRPC Node.js.
-```sh
-$ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs
-```
-This will download and run the [gRPC install script][], then install the latest version of gRPC Nodejs npm package.
-
## BUILD FROM SOURCE
1. Clone [the grpc Git Repository](https://github.com/grpc/grpc).
- 2. Follow the instructions in the `INSTALL` file in the root of that repository to install the C core library that this package depends on.
3. Run `npm install`.
-If you install the gRPC C core library in a custom location, then you need to set some environment variables to install this library. The command will look like this:
-
-```sh
-CXXFLAGS=-I<custom location>/include LDFLAGS=-L<custom location>/lib npm install [grpc]
-```
-
## TESTING
To run the test suite, simply run `npm test` in the install location.
@@ -110,7 +78,3 @@ ServerCredentials
```
An object with factory methods for creating credential objects for servers.
-
-[homebrew]:http://brew.sh
-[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
-[Debian jessie-backports]:http://backports.debian.org/Instructions/
diff --git a/src/node/binding.gyp b/src/node/binding.gyp
deleted file mode 100644
index 247719e981..0000000000
--- a/src/node/binding.gyp
+++ /dev/null
@@ -1,100 +0,0 @@
-{
- "variables" : {
- 'config': '<!(echo $CONFIG)'
- },
- "targets" : [
- {
- 'include_dirs': [
- "<!(node -e \"require('nan')\")"
- ],
- 'cflags': [
- '-std=c++0x',
- '-Wall',
- '-pthread',
- '-g',
- '-zdefs',
- '-Werror',
- '-Wno-error=deprecated-declarations'
- ],
- 'ldflags': [
- '-g'
- ],
- "conditions": [
- ['OS != "win"', {
- 'variables': {
- 'pkg_config_grpc': '<!(pkg-config --exists grpc >/dev/null 2>&1 && echo true || echo false)'
- },
- 'conditions': [
- ['config=="gcov"', {
- 'cflags': [
- '-ftest-coverage',
- '-fprofile-arcs',
- '-O0'
- ],
- 'ldflags': [
- '-ftest-coverage',
- '-fprofile-arcs'
- ]
- }
- ],
- ['pkg_config_grpc == "true"', {
- 'link_settings': {
- 'libraries': [
- '<!@(pkg-config --libs-only-l --static grpc)'
- ]
- },
- 'cflags': [
- '<!@(pkg-config --cflags grpc)'
- ],
- 'libraries': [
- '<!@(pkg-config --libs-only-L --static grpc)'
- ],
- 'ldflags': [
- '<!@(pkg-config --libs-only-other --static grpc)'
- ]
- }, {
- 'link_settings': {
- 'libraries': [
- '-lpthread',
- '-lgrpc',
- '-lgpr'
- ],
- },
- 'conditions':[
- ['OS != "mac"', {
- 'link_settings': {
- 'libraries': [
- '-lrt'
- ]
- }
- }]
- ]
- }
- ]
- ]
- }],
- ['OS == "mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9',
- 'OTHER_CFLAGS': [
- '-std=c++11',
- '-stdlib=libc++'
- ]
- }
- }]
- ],
- "target_name": "grpc",
- "sources": [
- "ext/byte_buffer.cc",
- "ext/call.cc",
- "ext/channel.cc",
- "ext/completion_queue_async_worker.cc",
- "ext/credentials.cc",
- "ext/node_grpc.cc",
- "ext/server.cc",
- "ext/server_credentials.cc",
- "ext/timeval.cc"
- ]
- }
- ]
-}
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index b08a9f96d8..f98fe2463b 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -168,8 +168,9 @@ Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
}
if (EndsWith(elem->key, "-bin")) {
Nan::Set(array, index_map[elem->key],
- Nan::CopyBuffer(elem->value,
- elem->value_length).ToLocalChecked());
+ MakeFastBuffer(
+ Nan::CopyBuffer(elem->value,
+ elem->value_length).ToLocalChecked()));
} else {
Nan::Set(array, index_map[elem->key],
Nan::New(elem->value).ToLocalChecked());
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index caca0fc452..8ea0c64b99 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -247,4 +247,4 @@ void init(Local<Object> exports) {
grpc::node::ServerCredentials::Init(exports);
}
-NODE_MODULE(grpc, init)
+NODE_MODULE(grpc_node, init)
diff --git a/src/node/index.js b/src/node/index.js
index 02b73f66ee..c49fdc8431 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -43,7 +43,7 @@ var server = require('./src/server.js');
var Metadata = require('./src/metadata.js');
-var grpc = require('bindings')('grpc');
+var grpc = require('bindings')('grpc_node');
/**
* Load a gRPC object from an existing ProtoBuf.Reflect object.
@@ -90,7 +90,6 @@ exports.load = function load(filename, format) {
default:
throw new Error('Unrecognized format "' + format + '"');
}
-
return loadObject(builder.ns);
};
diff --git a/src/node/interop/empty.proto b/src/node/interop/empty.proto
deleted file mode 100644
index 6d0eb937d6..0000000000
--- a/src/node/interop/empty.proto
+++ /dev/null
@@ -1,43 +0,0 @@
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-syntax = "proto3";
-
-package grpc.testing;
-
-// An empty message that you can re-use to avoid defining duplicated empty
-// messages in your project. A typical example is to use it as argument or the
-// return value of a service API. For instance:
-//
-// service Foo {
-// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { };
-// };
-//
-message Empty {}
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index 215d42121c..da614ad7b9 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -37,7 +37,9 @@ var fs = require('fs');
var path = require('path');
var _ = require('lodash');
var grpc = require('..');
-var testProto = grpc.load(__dirname + '/test.proto').grpc.testing;
+var testProto = grpc.load({
+ root: __dirname + '/../../..',
+ file: 'test/proto/test.proto'}).grpc.testing;
var GoogleAuth = require('google-auth-library');
var assert = require('assert');
@@ -49,6 +51,9 @@ var AUTH_USER = ('155450119199-vefjjaekcc6cmsd5914v6lqufunmh9ue' +
var COMPUTE_ENGINE_USER = ('155450119199-r5aaqa2vqoa9g5mv2m6s3m1l293rlmel' +
'@developer.gserviceaccount.com');
+var ECHO_INITIAL_KEY = 'x-grpc-test-echo-initial';
+var ECHO_TRAILING_KEY = 'x-grpc-test-echo-trailing-bin';
+
/**
* Create a buffer filled with size zeroes
* @param {number} size The length of the buffer
@@ -61,6 +66,27 @@ function zeroBuffer(size) {
}
/**
+ * This is used for testing functions with multiple asynchronous calls that
+ * can happen in different orders. This should be passed the number of async
+ * function invocations that can occur last, and each of those should call this
+ * function's return value
+ * @param {function()} done The function that should be called when a test is
+ * complete.
+ * @param {number} count The number of calls to the resulting function if the
+ * test passes.
+ * @return {function()} The function that should be called at the end of each
+ * sequence of asynchronous functions.
+ */
+function multiDone(done, count) {
+ return function() {
+ count -= 1;
+ if (count <= 0) {
+ done();
+ }
+ };
+}
+
+/**
* Run the empty_unary test
* @param {Client} client The client to test against
* @param {function} done Callback to call when the test is completed. Included
@@ -271,6 +297,54 @@ function timeoutOnSleepingServer(client, done) {
});
}
+function customMetadata(client, done) {
+ done = multiDone(done, 5);
+ var metadata = new grpc.Metadata();
+ metadata.set(ECHO_INITIAL_KEY, 'test_initial_metadata_value');
+ metadata.set(ECHO_TRAILING_KEY, new Buffer('ababab', 'hex'));
+ var arg = {
+ response_type: 'COMPRESSABLE',
+ response_size: 314159,
+ payload: {
+ body: zeroBuffer(271828)
+ }
+ };
+ var streaming_arg = {
+ payload: {
+ body: zeroBuffer(271828)
+ }
+ };
+ var unary = client.unaryCall(arg, function(err, resp) {
+ assert.ifError(err);
+ done();
+ }, metadata);
+ unary.on('metadata', function(metadata) {
+ assert.deepEqual(metadata.get(ECHO_INITIAL_KEY),
+ ['test_initial_metadata_value']);
+ done();
+ });
+ unary.on('status', function(status) {
+ var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY);
+ assert(echo_trailer.length > 0);
+ assert.strictEqual(echo_trailer[0].toString('hex'), 'ababab');
+ done();
+ });
+ var stream = client.fullDuplexCall(metadata);
+ stream.on('metadata', function(metadata) {
+ assert.deepEqual(metadata.get(ECHO_INITIAL_KEY),
+ ['test_initial_metadata_value']);
+ done();
+ });
+ stream.on('status', function(status) {
+ var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY);
+ assert(echo_trailer.length > 0);
+ assert.strictEqual(echo_trailer[0].toString('hex'), 'ababab');
+ done();
+ });
+ stream.write(streaming_arg);
+ stream.end();
+}
+
/**
* Run one of the authentication tests.
* @param {string} expected_user The expected username in the response
@@ -358,6 +432,7 @@ var test_cases = {
cancel_after_begin: cancelAfterBegin,
cancel_after_first_response: cancelAfterFirstResponse,
timeout_on_sleeping_server: timeoutOnSleepingServer,
+ custom_metadata: customMetadata,
compute_engine_creds: _.partial(authTest, COMPUTE_ENGINE_USER, null),
service_account_creds: _.partial(authTest, AUTH_USER, AUTH_SCOPE),
jwt_token_creds: _.partial(authTest, AUTH_USER, null),
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index 99155e9958..3e83304faa 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -37,7 +37,12 @@ var fs = require('fs');
var path = require('path');
var _ = require('lodash');
var grpc = require('..');
-var testProto = grpc.load(__dirname + '/test.proto').grpc.testing;
+var testProto = grpc.load({
+ root: __dirname + '/../../..',
+ file: 'test/proto/test.proto'}).grpc.testing;
+
+var ECHO_INITIAL_KEY = 'x-grpc-test-echo-initial';
+var ECHO_TRAILING_KEY = 'x-grpc-test-echo-trailing-bin';
/**
* Create a buffer filled with size zeroes
@@ -51,6 +56,34 @@ function zeroBuffer(size) {
}
/**
+ * Echos a header metadata item as specified in the interop spec.
+ * @param {Call} call The call to echo metadata on
+ */
+function echoHeader(call) {
+ var echo_initial = call.metadata.get(ECHO_INITIAL_KEY);
+ if (echo_initial.length > 0) {
+ var response_metadata = new grpc.Metadata();
+ response_metadata.set(ECHO_INITIAL_KEY, echo_initial[0]);
+ call.sendMetadata(response_metadata);
+ }
+}
+
+/**
+ * Gets the trailer metadata that should be echoed when the call is done,
+ * as specified in the interop spec.
+ * @param {Call} call The call to get metadata from
+ * @return {grpc.Metadata} The metadata to send as a trailer
+ */
+function getEchoTrailer(call) {
+ var echo_trailer = call.metadata.get(ECHO_TRAILING_KEY);
+ var response_trailer = new grpc.Metadata();
+ if (echo_trailer.length > 0) {
+ response_trailer.set(ECHO_TRAILING_KEY, echo_trailer[0]);
+ }
+ return response_trailer;
+}
+
+/**
* Respond to an empty parameter with an empty response.
* NOTE: this currently does not work due to issue #137
* @param {Call} call Call to handle
@@ -58,7 +91,8 @@ function zeroBuffer(size) {
* or error
*/
function handleEmpty(call, callback) {
- callback(null, {});
+ echoHeader(call);
+ callback(null, {}, getEchoTrailer(call));
}
/**
@@ -68,6 +102,7 @@ function handleEmpty(call, callback) {
* error
*/
function handleUnary(call, callback) {
+ echoHeader(call);
var req = call.request;
var zeros = zeroBuffer(req.response_size);
var payload_type = req.response_type;
@@ -75,7 +110,8 @@ function handleUnary(call, callback) {
payload_type = ['COMPRESSABLE',
'UNCOMPRESSABLE'][Math.random() < 0.5 ? 0 : 1];
}
- callback(null, {payload: {type: payload_type, body: zeros}});
+ callback(null, {payload: {type: payload_type, body: zeros}},
+ getEchoTrailer(call));
}
/**
@@ -85,12 +121,14 @@ function handleUnary(call, callback) {
* error
*/
function handleStreamingInput(call, callback) {
+ echoHeader(call);
var aggregate_size = 0;
call.on('data', function(value) {
aggregate_size += value.payload.body.length;
});
call.on('end', function() {
- callback(null, {aggregated_payload_size: aggregate_size});
+ callback(null, {aggregated_payload_size: aggregate_size},
+ getEchoTrailer(call));
});
}
@@ -99,6 +137,7 @@ function handleStreamingInput(call, callback) {
* @param {Call} call Call to handle
*/
function handleStreamingOutput(call) {
+ echoHeader(call);
var req = call.request;
var payload_type = req.response_type;
if (payload_type === 'RANDOM') {
@@ -113,7 +152,7 @@ function handleStreamingOutput(call) {
}
});
});
- call.end();
+ call.end(getEchoTrailer(call));
}
/**
@@ -122,6 +161,7 @@ function handleStreamingOutput(call) {
* @param {Call} call Call to handle
*/
function handleFullDuplex(call) {
+ echoHeader(call);
call.on('data', function(value) {
var payload_type = value.response_type;
if (payload_type === 'RANDOM') {
@@ -138,7 +178,7 @@ function handleFullDuplex(call) {
});
});
call.on('end', function() {
- call.end();
+ call.end(getEchoTrailer(call));
});
}
diff --git a/src/node/interop/messages.proto b/src/node/interop/messages.proto
deleted file mode 100644
index 7df85e3c13..0000000000
--- a/src/node/interop/messages.proto
+++ /dev/null
@@ -1,132 +0,0 @@
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-// Message definitions to be used by integration test service definitions.
-
-syntax = "proto3";
-
-package grpc.testing;
-
-// The type of payload that should be returned.
-enum PayloadType {
- // Compressable text format.
- COMPRESSABLE = 0;
-
- // Uncompressable binary format.
- UNCOMPRESSABLE = 1;
-
- // Randomly chosen from all other formats defined in this enum.
- RANDOM = 2;
-}
-
-// A block of data, to simply increase gRPC message size.
-message Payload {
- // The type of data in body.
- PayloadType type = 1;
- // Primary contents of payload.
- bytes body = 2;
-}
-
-// Unary request.
-message SimpleRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, server randomly chooses one from other formats.
- PayloadType response_type = 1;
-
- // Desired payload size in the response from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- int32 response_size = 2;
-
- // Optional input payload sent along with the request.
- Payload payload = 3;
-
- // Whether SimpleResponse should include username.
- bool fill_username = 4;
-
- // Whether SimpleResponse should include OAuth scope.
- bool fill_oauth_scope = 5;
-}
-
-// Unary response, as configured by the request.
-message SimpleResponse {
- // Payload to increase message size.
- Payload payload = 1;
- // The user the request came from, for verifying authentication was
- // successful when the client expected it.
- string username = 2;
- // OAuth scope.
- string oauth_scope = 3;
-}
-
-// Client-streaming request.
-message StreamingInputCallRequest {
- // Optional input payload sent along with the request.
- Payload payload = 1;
-
- // Not expecting any payload from the response.
-}
-
-// Client-streaming response.
-message StreamingInputCallResponse {
- // Aggregated size of payloads received from the client.
- int32 aggregated_payload_size = 1;
-}
-
-// Configuration for a particular response.
-message ResponseParameters {
- // Desired payload sizes in responses from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- int32 size = 1;
-
- // Desired interval between consecutive responses in the response stream in
- // microseconds.
- int32 interval_us = 2;
-}
-
-// Server-streaming request.
-message StreamingOutputCallRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, the payload from each response in the stream
- // might be of different types. This is to simulate a mixed type of payload
- // stream.
- PayloadType response_type = 1;
-
- // Configuration for each expected response message.
- repeated ResponseParameters response_parameters = 2;
-
- // Optional input payload sent along with the request.
- Payload payload = 3;
-}
-
-// Server-streaming response, as configured by the request and parameters.
-message StreamingOutputCallResponse {
- // Payload to increase response size.
- Payload payload = 1;
-}
diff --git a/src/node/interop/test.proto b/src/node/interop/test.proto
deleted file mode 100644
index 24e67497fa..0000000000
--- a/src/node/interop/test.proto
+++ /dev/null
@@ -1,72 +0,0 @@
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-// An integration test service that covers all the method signature permutations
-// of unary/streaming requests/responses.
-
-syntax = "proto3";
-
-import "empty.proto";
-import "messages.proto";
-
-package grpc.testing;
-
-// A simple service to test the various types of RPCs and experiment with
-// performance with various types of payload.
-service TestService {
- // One empty request followed by one empty response.
- rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty);
-
- // One request followed by one response.
- rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
-
- // One request followed by a sequence of responses (streamed download).
- // The server returns the payload with client desired type and sizes.
- rpc StreamingOutputCall(StreamingOutputCallRequest)
- returns (stream StreamingOutputCallResponse);
-
- // A sequence of requests followed by one response (streamed upload).
- // The server returns the aggregated size of client payload as the result.
- rpc StreamingInputCall(stream StreamingInputCallRequest)
- returns (StreamingInputCallResponse);
-
- // A sequence of requests with each request served by the server immediately.
- // As one request could lead to multiple responses, this interface
- // demonstrates the idea of full duplexing.
- rpc FullDuplexCall(stream StreamingOutputCallRequest)
- returns (stream StreamingOutputCallResponse);
-
- // A sequence of requests followed by a sequence of responses.
- // The server buffers all the client requests and then serves them in order. A
- // stream of responses are returned to the client when the server starts with
- // first request.
- rpc HalfDuplexCall(stream StreamingOutputCallRequest)
- returns (stream StreamingOutputCallResponse);
-}
diff --git a/src/node/package.json b/src/node/package.json
deleted file mode 100644
index 0faca7ddf1..0000000000
--- a/src/node/package.json
+++ /dev/null
@@ -1,58 +0,0 @@
-{
- "name": "grpc",
- "version": "0.11.1",
- "author": "Google Inc.",
- "description": "gRPC Library for Node",
- "homepage": "http://www.grpc.io/",
- "repository": {
- "type": "git",
- "url": "https://github.com/grpc/grpc.git"
- },
- "bugs": "https://github.com/grpc/grpc/issues",
- "contributors": [
- {
- "name": "Michael Lumish",
- "email": "mlumish@google.com"
- }
- ],
- "directories": {
- "lib": "src",
- "example": "examples"
- },
- "scripts": {
- "lint": "node ./node_modules/jshint/bin/jshint src test examples interop index.js",
- "test": "./node_modules/.bin/mocha && npm run-script lint",
- "gen_docs": "./node_modules/.bin/jsdoc -c jsdoc_conf.json",
- "coverage": "./node_modules/.bin/istanbul cover ./node_modules/.bin/_mocha"
- },
- "dependencies": {
- "bindings": "^1.2.0",
- "lodash": "^3.9.3",
- "nan": "^2.0.0",
- "protobufjs": "^4.0.0"
- },
- "devDependencies": {
- "async": "^0.9.0",
- "google-auth-library": "^0.9.2",
- "istanbul": "^0.3.21",
- "jsdoc": "^3.3.2",
- "jshint": "^2.5.0",
- "minimist": "^1.1.0",
- "mocha": "~1.21.0",
- "mustache": "^2.0.0"
- },
- "engines": {
- "node": ">=0.10.13"
- },
- "files": [
- "LICENSE",
- "README.md",
- "index.js",
- "binding.gyp",
- "ext",
- "health_check",
- "src"
- ],
- "main": "index.js",
- "license": "BSD-3-Clause"
-}
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 7f510231b3..33ddb3cec4 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -40,7 +40,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc.node');
+var grpc = require('bindings')('grpc_node');
var common = require('./common');
@@ -54,7 +54,7 @@ var Readable = stream.Readable;
var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
-var version = require('../package.json').version;
+var version = require('../../../package.json').version;
util.inherits(ClientWritableStream, Writable);
diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js
index c1da70b197..5c24e46c9b 100644
--- a/src/node/src/metadata.js
+++ b/src/node/src/metadata.js
@@ -59,6 +59,7 @@ function normalizeKey(key) {
function validate(key, value) {
if (_.endsWith(key, '-bin')) {
if (!(value instanceof Buffer)) {
+ console.log(value.constructor.toString());
throw new Error('keys that end with \'-bin\' must have Buffer values');
}
} else {
@@ -173,7 +174,9 @@ Metadata.prototype._getCoreRepresentation = function() {
Metadata._fromCoreRepresentation = function(metadata) {
var newMetadata = new Metadata();
if (metadata) {
- newMetadata._internal_repr = _.cloneDeep(metadata);
+ _.forOwn(metadata, function(value, key) {
+ newMetadata._internal_repr[key] = _.clone(value);
+ });
}
return newMetadata;
};
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 70b4a9d80e..87b5b7ad06 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -40,7 +40,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc.node');
+var grpc = require('bindings')('grpc_node');
var common = require('./common');
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index e7f071bcd5..ec502183c1 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc.node');
+var grpc = require('bindings')('grpc_node');
/**
* Helper function to return an absolute deadline given a relative timeout in
diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js
index d81df2a36d..4418dfff4c 100644
--- a/src/node/test/channel_test.js
+++ b/src/node/test/channel_test.js
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc.node');
+var grpc = require('bindings')('grpc_node');
/**
* This is used for testing functions with multiple asynchronous calls that
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index fa06ad4e4d..b17cd339cb 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc.node');
+var grpc = require('bindings')('grpc_node');
/**
* List of all status names
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 4b8da3bfb1..813221dafe 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc.node');
+var grpc = require('bindings')('grpc_node');
/**
* This is used for testing functions with multiple asynchronous calls that
diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js
index 2ca07c1d50..804c1d45e4 100644
--- a/src/node/test/interop_sanity_test.js
+++ b/src/node/test/interop_sanity_test.js
@@ -90,4 +90,8 @@ describe('Interop tests', function() {
interop_client.runTest(port, name_override, 'timeout_on_sleeping_server',
true, true, done);
});
+ it('should pass custom_metadata', function(done) {
+ interop_client.runTest(port, name_override, 'custom_metadata',
+ true, true, done);
+ });
});
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 1e69d52e58..999a183b3c 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -36,7 +36,7 @@
var assert = require('assert');
var fs = require('fs');
var path = require('path');
-var grpc = require('bindings')('grpc.node');
+var grpc = require('bindings')('grpc_node');
describe('server', function() {
describe('constructor', function() {
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 85f399c448..49643f9004 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -356,7 +356,7 @@ describe('Echo metadata', function() {
call.end();
});
it('shows the correct user-agent string', function(done) {
- var version = require('../package.json').version;
+ var version = require('../../../package.json').version;
var call = client.unary({}, function(err, data) { assert.ifError(err); },
metadata);
call.on('metadata', function(metadata) {