aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c1
-rw-r--r--src/core/channel/context.h7
-rw-r--r--src/core/httpcli/httpcli.c10
-rw-r--r--src/core/iomgr/fd_posix.c98
-rw-r--r--src/core/iomgr/fd_posix.h28
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c9
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c14
-rw-r--r--src/core/iomgr/pollset_posix.c20
-rw-r--r--src/core/iomgr/pollset_posix.h11
-rw-r--r--src/core/iomgr/sockaddr_utils.c6
-rw-r--r--src/core/iomgr/tcp_posix.c4
-rw-r--r--src/core/security/auth_filters.h (renamed from src/core/security/auth.h)7
-rw-r--r--src/core/security/client_auth_filter.c (renamed from src/core/security/auth.c)26
-rw-r--r--src/core/security/credentials.c216
-rw-r--r--src/core/security/credentials.h41
-rw-r--r--src/core/security/credentials_metadata.c101
-rw-r--r--src/core/security/security_connector.c77
-rw-r--r--src/core/security/security_connector.h5
-rw-r--r--src/core/security/security_context.c150
-rw-r--r--src/core/security/security_context.h48
-rw-r--r--src/core/security/server_auth_filter.c128
-rw-r--r--src/core/security/server_secure_chttp2.c25
-rw-r--r--src/core/support/cmdline.c70
-rw-r--r--src/core/support/subprocess_posix.c8
-rw-r--r--src/core/surface/call.c84
-rw-r--r--src/core/surface/call.h13
-rw-r--r--src/core/surface/call_log_batch.c16
-rw-r--r--src/core/surface/completion_queue.c4
-rw-r--r--src/core/surface/lame_client.c1
-rw-r--r--src/core/surface/secure_channel_create.c4
-rw-r--r--src/core/surface/server.c22
-rw-r--r--src/core/surface/server.h4
-rw-r--r--src/core/surface/server_chttp2.c3
-rw-r--r--src/core/transport/chttp2/alpn.c3
-rw-r--r--src/core/transport/chttp2/frame.h2
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.c40
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.h11
-rw-r--r--src/core/transport/chttp2/hpack_parser.c4
-rw-r--r--src/core/transport/chttp2/hpack_parser.h2
-rw-r--r--src/core/transport/chttp2_transport.c117
-rw-r--r--src/core/transport/metadata.h2
-rw-r--r--src/core/transport/transport.h3
-rw-r--r--src/core/tsi/ssl_transport_security.c10
43 files changed, 1093 insertions, 362 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 78f8d06d89..42e242ae81 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -144,6 +144,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
+ grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send(op->send_user_data, 0);
}
if (op->recv_ops) {
diff --git a/src/core/channel/context.h b/src/core/channel/context.h
index e2e5e80513..ac5796b9ef 100644
--- a/src/core/channel/context.h
+++ b/src/core/channel/context.h
@@ -41,4 +41,9 @@ typedef enum {
GRPC_CONTEXT_COUNT
} grpc_context_index;
-#endif
+typedef struct {
+ void *value;
+ void (*destroy)(void *);
+} grpc_call_context_element;
+
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H */
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index fe7ea6a86b..6e4156c385 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -67,7 +67,6 @@ static grpc_httpcli_post_override g_post_override = NULL;
static void next_address(internal_request *req);
static void finish(internal_request *req, int success) {
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
req->on_response(req->user_data, success ? &req->parser.r : NULL);
grpc_httpcli_parser_destroy(&req->parser);
if (req->addresses != NULL) {
@@ -86,8 +85,6 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
internal_request *req = user_data;
size_t i;
- gpr_log(GPR_DEBUG, "%s nslices=%d status=%d", __FUNCTION__, nslices, status);
-
for (i = 0; i < nslices; i++) {
if (GPR_SLICE_LENGTH(slices[i])) {
req->have_read_byte = 1;
@@ -120,13 +117,11 @@ done:
}
static void on_written(internal_request *req) {
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
grpc_endpoint_notify_on_read(req->ep, on_read, req);
}
static void done_write(void *arg, grpc_endpoint_cb_status status) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
switch (status) {
case GRPC_ENDPOINT_CB_OK:
on_written(req);
@@ -141,7 +136,6 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
switch (
grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
case GRPC_ENDPOINT_WRITE_DONE:
@@ -159,7 +153,6 @@ static void on_secure_transport_setup_done(void *rp,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
internal_request *req = rp;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
finish(req, 0);
@@ -172,7 +165,6 @@ static void on_secure_transport_setup_done(void *rp,
static void on_connected(void *arg, grpc_endpoint *tcp) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (!tcp) {
next_address(req);
return;
@@ -200,7 +192,6 @@ static void on_connected(void *arg, grpc_endpoint *tcp) {
static void next_address(internal_request *req) {
grpc_resolved_address *addr;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (req->next_address == req->addresses->naddrs) {
finish(req, 0);
return;
@@ -212,7 +203,6 @@ static void next_address(internal_request *req) {
static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (!addresses) {
finish(req, 0);
return;
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 9c8133d2d4..b697fcc64a 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -96,8 +96,10 @@ static grpc_fd *alloc_fd(int fd) {
gpr_atm_rel_store(&r->writest, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
- r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
r->freelist_next = NULL;
+ r->read_watcher = r->write_watcher = NULL;
return r;
}
@@ -147,14 +149,34 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void wake_watchers(grpc_fd *fd) {
- grpc_fd_watcher *watcher;
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
+ } else if (fd->read_watcher) {
+ grpc_pollset_force_kick(fd->read_watcher->pollset);
+ } else if (fd->write_watcher) {
+ grpc_pollset_force_kick(fd->write_watcher->pollset);
+ }
+}
+
+static void maybe_wake_one_watcher(grpc_fd *fd) {
gpr_mu_lock(&fd->watcher_mu);
- for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
- watcher = watcher->next) {
+ maybe_wake_one_watcher_locked(fd);
+ gpr_mu_unlock(&fd->watcher_mu);
+}
+
+static void wake_all_watchers_locked(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
grpc_pollset_force_kick(watcher->pollset);
}
- gpr_mu_unlock(&fd->watcher_mu);
+ if (fd->read_watcher) {
+ grpc_pollset_force_kick(fd->read_watcher->pollset);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ grpc_pollset_force_kick(fd->write_watcher->pollset);
+ }
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
@@ -162,7 +184,9 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
fd->on_done_user_data = user_data;
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
- wake_watchers(fd);
+ gpr_mu_lock(&fd->watcher_mu);
+ wake_all_watchers_locked(fd);
+ gpr_mu_unlock(&fd->watcher_mu);
unref_by(fd, 2); /* drop the reference */
}
@@ -204,7 +228,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
notify_on function. */
- wake_watchers(fd);
+ maybe_wake_one_watcher(fd);
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
@@ -290,29 +314,65 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *watcher) {
+ gpr_uint32 mask = 0;
/* keep track of pollers that have requested our events, in case they change
*/
grpc_fd_ref(fd);
gpr_mu_lock(&fd->watcher_mu);
- watcher->next = &fd->watcher_root;
- watcher->prev = watcher->next->prev;
- watcher->next->prev = watcher->prev->next = watcher;
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
watcher->pollset = pollset;
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
- return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
- (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
+ return mask;
}
-void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
- gpr_mu_lock(&watcher->fd->watcher_mu);
- watcher->next->prev = watcher->prev;
- watcher->prev->next = watcher->next;
- gpr_mu_unlock(&watcher->fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ gpr_mu_lock(&fd->watcher_mu);
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ kick = kick || !got_read;
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ kick = kick || !got_write;
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ gpr_mu_unlock(&fd->watcher_mu);
- grpc_fd_unref(watcher->fd);
+ grpc_fd_unref(fd);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index be21f2b55f..cfc533b7f5 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -66,8 +66,32 @@ struct grpc_fd {
gpr_mu set_state_mu;
gpr_atm shutdown;
+ /* The watcher list.
+
+ The following watcher related fields are protected by watcher_mu.
+
+ An fd_watcher is an ephemeral object created when an fd wants to
+ begin polling, and destroyed after the poll.
+
+ It denotes the fd's interest in whether to read poll or write poll
+ or both or neither on this fd.
+
+ If a watcher is asked to poll for reads or writes, the read_watcher
+ or write_watcher fields are set respectively. A watcher may be asked
+ to poll for both, in which case both fields will be set.
+
+ read_watcher and write_watcher may be NULL if no watcher has been
+ asked to poll for reads or writes.
+
+ If an fd_watcher is not asked to poll for reads or writes, it's added
+ to a linked list of inactive watchers, rooted at inactive_watcher_root.
+ If at a later time there becomes need of a poller to poll, one of
+ the inactive pollers may be kicked out of their poll loops to take
+ that responsibility. */
gpr_mu watcher_mu;
- grpc_fd_watcher watcher_root;
+ grpc_fd_watcher inactive_watcher_root;
+ grpc_fd_watcher *read_watcher;
+ grpc_fd_watcher *write_watcher;
gpr_atm readst;
gpr_atm writest;
@@ -103,7 +127,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll */
-void grpc_fd_end_poll(grpc_fd_watcher *rec);
+void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
int grpc_fd_is_orphaned(grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index a1c3938a33..40b7935a57 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -164,8 +164,8 @@ static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_maybe_work, epoll_kick,
multipoll_with_epoll_pollset_destroy};
-void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
+static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
+ size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
struct epoll_event ev;
@@ -194,4 +194,7 @@ void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
}
}
-#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
+ epoll_become_multipoller;
+
+#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 25b7cfda1a..d781c9b4bb 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL
+#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_posix.h"
@@ -98,7 +98,8 @@ static void end_polling(grpc_pollset *pollset) {
pollset_hdr *h;
h = pollset->data.ptr;
for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(&h->watchers[i]);
+ grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN,
+ h->pfds[i].revents & POLLOUT);
}
}
@@ -228,8 +229,8 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_maybe_work, multipoll_with_poll_pollset_kick,
multipoll_with_poll_pollset_destroy};
-void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
+void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
+ size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
pollset->vtable = &multipoll_with_poll_pollset;
@@ -250,4 +251,9 @@ void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
}
}
+#endif /* GPR_POSIX_SOCKET */
+
+#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL
+grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
+ grpc_poll_become_multipoller;
#endif
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 4d1bcad9e2..826c792990 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -174,6 +174,8 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now();
+ /* FIXME(ctiller): see below */
+ gpr_timespec maximum_deadline = gpr_time_add(now, gpr_time_from_seconds(1));
int r;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
@@ -184,6 +186,11 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1;
}
+ /* FIXME(ctiller): we should not clamp deadline, however we have some
+ stuck at shutdown bugs that this resolves */
+ if (gpr_time_cmp(deadline, maximum_deadline) > 0) {
+ deadline = maximum_deadline;
+ }
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
r = pollset->vtable->maybe_work(pollset, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
@@ -258,7 +265,6 @@ static void unary_poll_do_promote(void *args, int success) {
grpc_pollset *pollset = up_args->pollset;
grpc_fd *fd = up_args->fd;
int do_shutdown_cb = 0;
- gpr_free(up_args);
/*
* This is quite tricky. There are a number of cases to keep in mind here:
@@ -273,8 +279,12 @@ static void unary_poll_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
while (pollset->counter != 0) {
grpc_pollset_kick(pollset);
- gpr_cv_wait(&pollset->cv, &pollset->mu, gpr_inf_future);
+ grpc_iomgr_add_callback(unary_poll_do_promote, up_args);
+ gpr_mu_unlock(&pollset->mu);
+ return;
}
+
+ gpr_free(up_args);
/* At this point the pollset may no longer be a unary poller. In that case
* we should just call the right add function and be done. */
/* TODO(klempner): If we're not careful this could cause infinite recursion.
@@ -410,10 +420,12 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
- r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
+ /* poll fd count (argument 2) is shortened by one if we have no events
+ to poll on - such that it only includes the kicker */
+ r = poll(pfd, GPR_ARRAY_SIZE(pfd) - (pfd[1].events == 0), timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
- grpc_fd_end_poll(&fd_watcher);
+ grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT);
if (r < 0) {
if (errno != EINTR) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index da843f7381..088ec910c2 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -101,7 +101,12 @@ void grpc_kick_drain(grpc_pollset *p);
grpc_pollset *grpc_backup_pollset(void);
/* turn a pollset into a multipoller: platform specific */
-void grpc_platform_become_multipoller(grpc_pollset *pollset,
- struct grpc_fd **fds, size_t fd_count);
+typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset,
+ struct grpc_fd **fds,
+ size_t fd_count);
+extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller;
-#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
+void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
+ size_t fd_count);
+
+#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index 740bbe716e..3d202a5cc8 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -169,8 +169,7 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) {
case AF_UNIX:
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family,
- __FUNCTION__);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family);
return 0;
}
}
@@ -184,8 +183,7 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) {
((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family,
- __FUNCTION__);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", addr->sa_family);
return 0;
}
}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index f7dae5f86c..cd6b2ecae6 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -138,8 +138,10 @@ static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
native "trim the first N bytes" operation to splice */
/* TODO(klempner): This really shouldn't be modifying the current slice
unless we own the slices array. */
- *current_slice = gpr_slice_split_tail(current_slice, prefix_bytes);
+ gpr_slice tail;
+ tail = gpr_slice_split_tail(current_slice, prefix_bytes);
gpr_slice_unref(*current_slice);
+ *current_slice = tail;
return;
} else {
gpr_slice_unref(*current_slice);
diff --git a/src/core/security/auth.h b/src/core/security/auth_filters.h
index 08dc4152ba..ff921690e0 100644
--- a/src/core/security/auth.h
+++ b/src/core/security/auth_filters.h
@@ -31,11 +31,12 @@
*
*/
-#ifndef GRPC_INTERNAL_CORE_SECURITY_AUTH_H
-#define GRPC_INTERNAL_CORE_SECURITY_AUTH_H
+#ifndef GRPC_INTERNAL_CORE_SECURITY_AUTH_FILTERS_H
+#define GRPC_INTERNAL_CORE_SECURITY_AUTH_FILTERS_H
#include "src/core/channel/channel_stack.h"
extern const grpc_channel_filter grpc_client_auth_filter;
+extern const grpc_channel_filter grpc_server_auth_filter;
-#endif /* GRPC_INTERNAL_CORE_SECURITY_AUTH_H */
+#endif /* GRPC_INTERNAL_CORE_SECURITY_AUTH_FILTERS_H */
diff --git a/src/core/security/auth.c b/src/core/security/client_auth_filter.c
index faf12d8f14..b2bce1fd32 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/client_auth_filter.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/security/auth.h"
+#include "src/core/security/auth_filters.h"
#include <string.h>
@@ -77,11 +77,13 @@ static void bubble_up_error(grpc_call_element *elem, const char *error_msg) {
grpc_call_next_op(elem, &calld->op);
}
-static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
+static void on_credentials_metadata(void *user_data,
+ grpc_credentials_md *md_elems,
size_t num_md,
grpc_credentials_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
grpc_transport_op *op = &calld->op;
grpc_metadata_batch *mdb;
size_t i;
@@ -94,8 +96,10 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA);
mdb = &op->send_ops->ops[calld->op_md_idx].data.metadata;
for (i = 0; i < num_md; i++) {
- grpc_metadata_batch_add_tail(mdb, &calld->md_links[i],
- grpc_mdelem_ref(md_elems[i]));
+ grpc_metadata_batch_add_tail(
+ mdb, &calld->md_links[i],
+ grpc_mdelem_from_slices(chand->md_ctx, gpr_slice_ref(md_elems[i].key),
+ gpr_slice_ref(md_elems[i].value)));
}
grpc_call_next_op(elem, op);
}
@@ -125,7 +129,7 @@ static void send_security_metadata(grpc_call_element *elem,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
- (grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY];
+ (grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY].value;
char *service_url = NULL;
grpc_credentials *channel_creds =
chand->security_connector->request_metadata_creds;
@@ -189,6 +193,8 @@ static void auth_start_transport_op(grpc_call_element *elem,
grpc_linked_mdelem *l;
size_t i;
+ /* TODO(jboeuf): write the call auth context. */
+
if (op->send_ops && !calld->sent_initial_metadata) {
size_t nops = op->send_ops->nops;
grpc_stream_op *ops = op->send_ops->ops;
@@ -273,7 +279,7 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
- grpc_security_connector *ctx = grpc_find_security_connector_in_args(args);
+ grpc_security_connector *sc = grpc_find_security_connector_in_args(args);
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
@@ -282,12 +288,12 @@ static void init_channel_elem(grpc_channel_element *elem,
path */
GPR_ASSERT(!is_first);
GPR_ASSERT(!is_last);
- GPR_ASSERT(ctx != NULL);
+ GPR_ASSERT(sc != NULL);
/* initialize members */
- GPR_ASSERT(ctx->is_client_side);
+ GPR_ASSERT(sc->is_client_side);
chand->security_connector =
- (grpc_channel_security_connector *)grpc_security_connector_ref(ctx);
+ (grpc_channel_security_connector *)grpc_security_connector_ref(sc);
chand->md_ctx = metadata_context;
chand->authority_string =
grpc_mdstr_from_string(chand->md_ctx, ":authority");
@@ -321,4 +327,4 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
const grpc_channel_filter grpc_client_auth_filter = {
auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "auth"};
+ destroy_channel_elem, "client-auth"};
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index f6366f0750..ae22bf47a0 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -114,20 +114,6 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds,
creds->vtable->get_request_metadata(creds, service_url, cb, user_data);
}
-grpc_mdctx *grpc_credentials_get_or_create_metadata_context(
- grpc_credentials *creds) {
- grpc_mdctx *mdctx = NULL;
- if (creds != NULL && creds->vtable->get_metadata_context != NULL) {
- mdctx = creds->vtable->get_metadata_context(creds);
- }
- if (mdctx == NULL) {
- return grpc_mdctx_create();
- } else {
- grpc_mdctx_ref(mdctx);
- return mdctx;
- }
-}
-
grpc_security_status grpc_credentials_create_security_connector(
grpc_credentials *creds, const char *target, const grpc_channel_args *args,
grpc_credentials *request_metadata_creds,
@@ -208,10 +194,6 @@ static int ssl_has_request_metadata_only(const grpc_credentials *creds) {
return 0;
}
-static grpc_mdctx *ssl_get_metadata_context(grpc_credentials *creds) {
- return NULL;
-}
-
static grpc_security_status ssl_create_security_connector(
grpc_credentials *creds, const char *target, const grpc_channel_args *args,
grpc_credentials *request_metadata_creds,
@@ -249,8 +231,8 @@ static grpc_security_status ssl_server_create_security_connector(
}
static grpc_credentials_vtable ssl_vtable = {
- ssl_destroy, ssl_has_request_metadata, ssl_has_request_metadata_only,
- ssl_get_metadata_context, NULL, ssl_create_security_connector};
+ ssl_destroy, ssl_has_request_metadata, ssl_has_request_metadata_only, NULL,
+ ssl_create_security_connector};
static grpc_server_credentials_vtable ssl_server_vtable = {
ssl_server_destroy, ssl_server_create_security_connector};
@@ -341,13 +323,12 @@ grpc_server_credentials *grpc_ssl_server_credentials_create(
typedef struct {
grpc_credentials base;
- grpc_mdctx *md_ctx;
/* Have a simple cache for now with just 1 entry. We could have a map based on
the service_url for a more sophisticated one. */
gpr_mu cache_mu;
struct {
- grpc_mdelem *jwt_md;
+ grpc_credentials_md_store *jwt_md;
char *service_url;
gpr_timespec jwt_expiration;
} cached;
@@ -358,7 +339,7 @@ typedef struct {
static void jwt_reset_cache(grpc_jwt_credentials *c) {
if (c->cached.jwt_md != NULL) {
- grpc_mdelem_unref(c->cached.jwt_md);
+ grpc_credentials_md_store_unref(c->cached.jwt_md);
c->cached.jwt_md = NULL;
}
if (c->cached.service_url != NULL) {
@@ -373,7 +354,6 @@ static void jwt_destroy(grpc_credentials *creds) {
grpc_auth_json_key_destruct(&c->key);
jwt_reset_cache(c);
gpr_mu_destroy(&c->cache_mu);
- grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}
@@ -393,7 +373,7 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
0};
/* See if we can return a cached jwt. */
- grpc_mdelem *jwt_md = NULL;
+ grpc_credentials_md_store *jwt_md = NULL;
{
gpr_mu_lock(&c->cache_mu);
if (c->cached.service_url != NULL &&
@@ -401,7 +381,7 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
c->cached.jwt_md != NULL &&
(gpr_time_cmp(gpr_time_sub(c->cached.jwt_expiration, gpr_now()),
refresh_threshold) > 0)) {
- jwt_md = grpc_mdelem_ref(c->cached.jwt_md);
+ jwt_md = grpc_credentials_md_store_ref(c->cached.jwt_md);
}
gpr_mu_unlock(&c->cache_mu);
}
@@ -418,30 +398,26 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
gpr_free(jwt);
c->cached.jwt_expiration = gpr_time_add(gpr_now(), c->jwt_lifetime);
c->cached.service_url = gpr_strdup(service_url);
- c->cached.jwt_md = grpc_mdelem_from_strings(
- c->md_ctx, GRPC_AUTHORIZATION_METADATA_KEY, md_value);
+ c->cached.jwt_md = grpc_credentials_md_store_create(1);
+ grpc_credentials_md_store_add_cstrings(
+ c->cached.jwt_md, GRPC_AUTHORIZATION_METADATA_KEY, md_value);
gpr_free(md_value);
- jwt_md = grpc_mdelem_ref(c->cached.jwt_md);
+ jwt_md = grpc_credentials_md_store_ref(c->cached.jwt_md);
}
gpr_mu_unlock(&c->cache_mu);
}
if (jwt_md != NULL) {
- cb(user_data, &jwt_md, 1, GRPC_CREDENTIALS_OK);
- grpc_mdelem_unref(jwt_md);
+ cb(user_data, jwt_md->entries, jwt_md->num_entries, GRPC_CREDENTIALS_OK);
+ grpc_credentials_md_store_unref(jwt_md);
} else {
cb(user_data, NULL, 0, GRPC_CREDENTIALS_ERROR);
}
}
-static grpc_mdctx *jwt_get_metadata_context(grpc_credentials *creds) {
- grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds;
- return c->md_ctx;
-}
-
static grpc_credentials_vtable jwt_vtable = {
jwt_destroy, jwt_has_request_metadata, jwt_has_request_metadata_only,
- jwt_get_metadata_context, jwt_get_request_metadata, NULL};
+ jwt_get_request_metadata, NULL};
grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
gpr_timespec token_lifetime) {
@@ -456,7 +432,6 @@ grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
c->base.type = GRPC_CREDENTIALS_TYPE_JWT;
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &jwt_vtable;
- c->md_ctx = grpc_mdctx_create();
c->key = key;
c->jwt_lifetime = token_lifetime;
gpr_mu_init(&c->cache_mu);
@@ -476,8 +451,7 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_credentials_metadata_request *req,
typedef struct {
grpc_credentials base;
gpr_mu mu;
- grpc_mdctx *md_ctx;
- grpc_mdelem *access_token_md;
+ grpc_credentials_md_store *access_token_md;
gpr_timespec token_expiration;
grpc_fetch_oauth2_func fetch_func;
} grpc_oauth2_token_fetcher_credentials;
@@ -485,11 +459,8 @@ typedef struct {
static void oauth2_token_fetcher_destroy(grpc_credentials *creds) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
- if (c->access_token_md != NULL) {
- grpc_mdelem_unref(c->access_token_md);
- }
+ grpc_credentials_md_store_unref(c->access_token_md);
gpr_mu_destroy(&c->mu);
- grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}
@@ -505,8 +476,8 @@ static int oauth2_token_fetcher_has_request_metadata_only(
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
- const grpc_httpcli_response *response, grpc_mdctx *ctx,
- grpc_mdelem **token_elem, gpr_timespec *token_lifetime) {
+ const grpc_httpcli_response *response,
+ grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime) {
char *null_terminated_body = NULL;
char *new_access_token = NULL;
grpc_credentials_status status = GRPC_CREDENTIALS_OK;
@@ -574,16 +545,17 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
access_token->value);
token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10);
token_lifetime->tv_nsec = 0;
- if (*token_elem != NULL) grpc_mdelem_unref(*token_elem);
- *token_elem = grpc_mdelem_from_strings(ctx, GRPC_AUTHORIZATION_METADATA_KEY,
- new_access_token);
+ if (*token_md != NULL) grpc_credentials_md_store_unref(*token_md);
+ *token_md = grpc_credentials_md_store_create(1);
+ grpc_credentials_md_store_add_cstrings(
+ *token_md, GRPC_AUTHORIZATION_METADATA_KEY, new_access_token);
status = GRPC_CREDENTIALS_OK;
}
end:
- if (status != GRPC_CREDENTIALS_OK && (*token_elem != NULL)) {
- grpc_mdelem_unref(*token_elem);
- *token_elem = NULL;
+ if (status != GRPC_CREDENTIALS_OK && (*token_md != NULL)) {
+ grpc_credentials_md_store_unref(*token_md);
+ *token_md = NULL;
}
if (null_terminated_body != NULL) gpr_free(null_terminated_body);
if (new_access_token != NULL) gpr_free(new_access_token);
@@ -602,10 +574,11 @@ static void on_oauth2_token_fetcher_http_response(
gpr_mu_lock(&c->mu);
status = grpc_oauth2_token_fetcher_credentials_parse_server_response(
- response, c->md_ctx, &c->access_token_md, &token_lifetime);
+ response, &c->access_token_md, &token_lifetime);
if (status == GRPC_CREDENTIALS_OK) {
c->token_expiration = gpr_time_add(gpr_now(), token_lifetime);
- r->cb(r->user_data, &c->access_token_md, 1, status);
+ r->cb(r->user_data, c->access_token_md->entries,
+ c->access_token_md->num_entries, status);
} else {
c->token_expiration = gpr_inf_past;
r->cb(r->user_data, NULL, 0, status);
@@ -621,19 +594,20 @@ static void oauth2_token_fetcher_get_request_metadata(
(grpc_oauth2_token_fetcher_credentials *)creds;
gpr_timespec refresh_threshold = {GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS,
0};
- grpc_mdelem *cached_access_token_md = NULL;
+ grpc_credentials_md_store *cached_access_token_md = NULL;
{
gpr_mu_lock(&c->mu);
if (c->access_token_md != NULL &&
(gpr_time_cmp(gpr_time_sub(c->token_expiration, gpr_now()),
refresh_threshold) > 0)) {
- cached_access_token_md = grpc_mdelem_ref(c->access_token_md);
+ cached_access_token_md = grpc_credentials_md_store_ref(c->access_token_md);
}
gpr_mu_unlock(&c->mu);
}
if (cached_access_token_md != NULL) {
- cb(user_data, &cached_access_token_md, 1, GRPC_CREDENTIALS_OK);
- grpc_mdelem_unref(cached_access_token_md);
+ cb(user_data, cached_access_token_md->entries,
+ cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK);
+ grpc_credentials_md_store_unref(cached_access_token_md);
} else {
c->fetch_func(
grpc_credentials_metadata_request_create(creds, cb, user_data),
@@ -648,24 +622,15 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
c->base.type = GRPC_CREDENTIALS_TYPE_OAUTH2;
gpr_ref_init(&c->base.refcount, 1);
gpr_mu_init(&c->mu);
- c->md_ctx = grpc_mdctx_create();
c->token_expiration = gpr_inf_past;
c->fetch_func = fetch_func;
}
-static grpc_mdctx *oauth2_token_fetcher_get_metadata_context(
- grpc_credentials *creds) {
- grpc_oauth2_token_fetcher_credentials *c =
- (grpc_oauth2_token_fetcher_credentials *)creds;
- return c->md_ctx;
-}
-
/* -- ComputeEngine credentials. -- */
static grpc_credentials_vtable compute_engine_vtable = {
oauth2_token_fetcher_destroy, oauth2_token_fetcher_has_request_metadata,
oauth2_token_fetcher_has_request_metadata_only,
- oauth2_token_fetcher_get_metadata_context,
oauth2_token_fetcher_get_request_metadata, NULL};
static void compute_engine_fetch_oauth2(
@@ -709,7 +674,6 @@ static void service_account_destroy(grpc_credentials *creds) {
static grpc_credentials_vtable service_account_vtable = {
service_account_destroy, oauth2_token_fetcher_has_request_metadata,
oauth2_token_fetcher_has_request_metadata_only,
- oauth2_token_fetcher_get_metadata_context,
oauth2_token_fetcher_get_request_metadata, NULL};
static void service_account_fetch_oauth2(
@@ -783,7 +747,6 @@ static void refresh_token_destroy(grpc_credentials *creds) {
static grpc_credentials_vtable refresh_token_vtable = {
refresh_token_destroy, oauth2_token_fetcher_has_request_metadata,
oauth2_token_fetcher_has_request_metadata_only,
- oauth2_token_fetcher_get_metadata_context,
oauth2_token_fetcher_get_request_metadata, NULL};
static void refresh_token_fetch_oauth2(
@@ -832,17 +795,13 @@ grpc_credentials *grpc_refresh_token_credentials_create(
typedef struct {
grpc_credentials base;
- grpc_mdctx *md_ctx;
- grpc_mdelem *access_token_md;
+ grpc_credentials_md_store *access_token_md;
int is_async;
} grpc_fake_oauth2_credentials;
static void fake_oauth2_destroy(grpc_credentials *creds) {
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
- if (c->access_token_md != NULL) {
- grpc_mdelem_unref(c->access_token_md);
- }
- grpc_mdctx_unref(c->md_ctx);
+ grpc_credentials_md_store_unref(c->access_token_md);
gpr_free(c);
}
@@ -860,7 +819,8 @@ void on_simulated_token_fetch_done(void *user_data, int success) {
(grpc_credentials_metadata_request *)user_data;
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)r->creds;
GPR_ASSERT(success);
- r->cb(r->user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK);
+ r->cb(r->user_data, c->access_token_md->entries,
+ c->access_token_md->num_entries, GRPC_CREDENTIALS_OK);
grpc_credentials_metadata_request_destroy(r);
}
@@ -875,19 +835,14 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds,
on_simulated_token_fetch_done,
grpc_credentials_metadata_request_create(creds, cb, user_data));
} else {
- cb(user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK);
+ cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
}
}
-static grpc_mdctx *fake_oauth2_get_metadata_context(grpc_credentials *creds) {
- grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
- return c->md_ctx;
-}
-
static grpc_credentials_vtable fake_oauth2_vtable = {
fake_oauth2_destroy, fake_oauth2_has_request_metadata,
- fake_oauth2_has_request_metadata_only, fake_oauth2_get_metadata_context,
- fake_oauth2_get_request_metadata, NULL};
+ fake_oauth2_has_request_metadata_only, fake_oauth2_get_request_metadata,
+ NULL};
grpc_credentials *grpc_fake_oauth2_credentials_create(
const char *token_md_value, int is_async) {
@@ -897,9 +852,9 @@ grpc_credentials *grpc_fake_oauth2_credentials_create(
c->base.type = GRPC_CREDENTIALS_TYPE_OAUTH2;
c->base.vtable = &fake_oauth2_vtable;
gpr_ref_init(&c->base.refcount, 1);
- c->md_ctx = grpc_mdctx_create();
- c->access_token_md = grpc_mdelem_from_strings(
- c->md_ctx, GRPC_AUTHORIZATION_METADATA_KEY, token_md_value);
+ c->access_token_md = grpc_credentials_md_store_create(1);
+ grpc_credentials_md_store_add_cstrings(
+ c->access_token_md, GRPC_AUTHORIZATION_METADATA_KEY, token_md_value);
c->is_async = is_async;
return &c->base;
}
@@ -926,11 +881,6 @@ static int fake_transport_security_has_request_metadata_only(
return 0;
}
-static grpc_mdctx *fake_transport_security_get_metadata_context(
- grpc_credentials *c) {
- return NULL;
-}
-
static grpc_security_status
fake_transport_security_create_security_connector(
grpc_credentials *c, const char *target, const grpc_channel_args *args,
@@ -950,8 +900,7 @@ fake_transport_security_server_create_security_connector(
static grpc_credentials_vtable fake_transport_security_credentials_vtable = {
fake_transport_security_credentials_destroy,
fake_transport_security_has_request_metadata,
- fake_transport_security_has_request_metadata_only,
- fake_transport_security_get_metadata_context, NULL,
+ fake_transport_security_has_request_metadata_only, NULL,
fake_transport_security_create_security_connector};
static grpc_server_credentials_vtable
@@ -988,8 +937,7 @@ typedef struct {
typedef struct {
grpc_composite_credentials *composite_creds;
size_t creds_index;
- grpc_mdelem **md_elems;
- size_t num_md;
+ grpc_credentials_md_store *md_elems;
char *service_url;
void *user_data;
grpc_credentials_metadata_cb cb;
@@ -1031,21 +979,16 @@ static int composite_has_request_metadata_only(const grpc_credentials *creds) {
static void composite_md_context_destroy(
grpc_composite_credentials_metadata_context *ctx) {
- size_t i;
- for (i = 0; i < ctx->num_md; i++) {
- grpc_mdelem_unref(ctx->md_elems[i]);
- }
- gpr_free(ctx->md_elems);
+ grpc_credentials_md_store_unref(ctx->md_elems);
if (ctx->service_url != NULL) gpr_free(ctx->service_url);
gpr_free(ctx);
}
-static void composite_metadata_cb(void *user_data, grpc_mdelem **md_elems,
- size_t num_md,
+static void composite_metadata_cb(void *user_data,
+ grpc_credentials_md *md_elems, size_t num_md,
grpc_credentials_status status) {
grpc_composite_credentials_metadata_context *ctx =
(grpc_composite_credentials_metadata_context *)user_data;
- size_t i;
if (status != GRPC_CREDENTIALS_OK) {
ctx->cb(ctx->user_data, NULL, 0, status);
return;
@@ -1053,12 +996,11 @@ static void composite_metadata_cb(void *user_data, grpc_mdelem **md_elems,
/* Copy the metadata in the context. */
if (num_md > 0) {
- ctx->md_elems = gpr_realloc(ctx->md_elems,
- (ctx->num_md + num_md) * sizeof(grpc_mdelem *));
+ size_t i;
for (i = 0; i < num_md; i++) {
- ctx->md_elems[i + ctx->num_md] = grpc_mdelem_ref(md_elems[i]);
+ grpc_credentials_md_store_add(ctx->md_elems, md_elems[i].key,
+ md_elems[i].value);
}
- ctx->num_md += num_md;
}
/* See if we need to get some more metadata. */
@@ -1073,7 +1015,8 @@ static void composite_metadata_cb(void *user_data, grpc_mdelem **md_elems,
}
/* We're done!. */
- ctx->cb(ctx->user_data, ctx->md_elems, ctx->num_md, GRPC_CREDENTIALS_OK);
+ ctx->cb(ctx->user_data, ctx->md_elems->entries, ctx->md_elems->num_entries,
+ GRPC_CREDENTIALS_OK);
composite_md_context_destroy(ctx);
}
@@ -1093,6 +1036,7 @@ static void composite_get_request_metadata(grpc_credentials *creds,
ctx->user_data = user_data;
ctx->cb = cb;
ctx->composite_creds = c;
+ ctx->md_elems = grpc_credentials_md_store_create(c->inner.num_creds);
while (ctx->creds_index < c->inner.num_creds) {
grpc_credentials *inner_creds = c->inner.creds_array[ctx->creds_index++];
if (grpc_credentials_has_request_metadata(inner_creds)) {
@@ -1104,25 +1048,6 @@ static void composite_get_request_metadata(grpc_credentials *creds,
GPR_ASSERT(0); /* Should have exited before. */
}
-static grpc_mdctx *composite_get_metadata_context(grpc_credentials *creds) {
- grpc_composite_credentials *c = (grpc_composite_credentials *)creds;
- grpc_mdctx *ctx = NULL;
- size_t i;
- for (i = 0; i < c->inner.num_creds; i++) {
- grpc_credentials *inner_creds = c->inner.creds_array[i];
- grpc_mdctx *inner_ctx = NULL;
- if (inner_creds->vtable->get_metadata_context != NULL) {
- inner_ctx = inner_creds->vtable->get_metadata_context(inner_creds);
- }
- if (inner_ctx) {
- GPR_ASSERT(ctx == NULL &&
- "can only have one metadata context per composite credential");
- ctx = inner_ctx;
- }
- }
- return ctx;
-}
-
static grpc_security_status composite_create_security_connector(
grpc_credentials *creds, const char *target, const grpc_channel_args *args,
grpc_credentials *request_metadata_creds,
@@ -1139,8 +1064,8 @@ static grpc_security_status composite_create_security_connector(
static grpc_credentials_vtable composite_credentials_vtable = {
composite_destroy, composite_has_request_metadata,
- composite_has_request_metadata_only, composite_get_metadata_context,
- composite_get_request_metadata, composite_create_security_connector};
+ composite_has_request_metadata_only, composite_get_request_metadata,
+ composite_create_security_connector};
static grpc_credentials_array get_creds_array(grpc_credentials **creds_addr) {
grpc_credentials_array result;
@@ -1237,16 +1162,12 @@ grpc_credentials *grpc_credentials_contains_type(
typedef struct {
grpc_credentials base;
- grpc_mdctx *md_ctx;
- grpc_mdelem *token_md;
- grpc_mdelem *authority_selector_md;
+ grpc_credentials_md_store *iam_md;
} grpc_iam_credentials;
static void iam_destroy(grpc_credentials *creds) {
grpc_iam_credentials *c = (grpc_iam_credentials *)creds;
- grpc_mdelem_unref(c->token_md);
- grpc_mdelem_unref(c->authority_selector_md);
- grpc_mdctx_unref(c->md_ctx);
+ grpc_credentials_md_store_unref(c->iam_md);
gpr_free(c);
}
@@ -1263,20 +1184,13 @@ static void iam_get_request_metadata(grpc_credentials *creds,
grpc_credentials_metadata_cb cb,
void *user_data) {
grpc_iam_credentials *c = (grpc_iam_credentials *)creds;
- grpc_mdelem *md_array[2];
- md_array[0] = c->token_md;
- md_array[1] = c->authority_selector_md;
- cb(user_data, md_array, 2, GRPC_CREDENTIALS_OK);
-}
-
-static grpc_mdctx *iam_get_metadata_context(grpc_credentials *creds) {
- grpc_iam_credentials *c = (grpc_iam_credentials *)creds;
- return c->md_ctx;
+ cb(user_data, c->iam_md->entries, c->iam_md->num_entries,
+ GRPC_CREDENTIALS_OK);
}
static grpc_credentials_vtable iam_vtable = {
iam_destroy, iam_has_request_metadata, iam_has_request_metadata_only,
- iam_get_metadata_context, iam_get_request_metadata, NULL};
+ iam_get_request_metadata, NULL};
grpc_credentials *grpc_iam_credentials_create(const char *token,
const char *authority_selector) {
@@ -1288,10 +1202,10 @@ grpc_credentials *grpc_iam_credentials_create(const char *token,
c->base.type = GRPC_CREDENTIALS_TYPE_IAM;
c->base.vtable = &iam_vtable;
gpr_ref_init(&c->base.refcount, 1);
- c->md_ctx = grpc_mdctx_create();
- c->token_md = grpc_mdelem_from_strings(
- c->md_ctx, GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, token);
- c->authority_selector_md = grpc_mdelem_from_strings(
- c->md_ctx, GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, authority_selector);
+ c->iam_md = grpc_credentials_md_store_create(2);
+ grpc_credentials_md_store_add_cstrings(
+ c->iam_md, GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, token);
+ grpc_credentials_md_store_add_cstrings(
+ c->iam_md, GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, authority_selector);
return &c->base;
}
diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h
index 87c773e49a..4768ce6990 100644
--- a/src/core/security/credentials.h
+++ b/src/core/security/credentials.h
@@ -82,13 +82,40 @@ typedef enum {
#define GRPC_REFRESH_TOKEN_POST_BODY_FORMAT_STRING \
"client_id=%s&client_secret=%s&refresh_token=%s&grant_type=refresh_token"
+/* --- grpc_credentials_md. --- */
+
+typedef struct {
+ gpr_slice key;
+ gpr_slice value;
+} grpc_credentials_md;
+
+typedef struct {
+ grpc_credentials_md *entries;
+ size_t num_entries;
+ size_t allocated;
+ gpr_refcount refcount;
+} grpc_credentials_md_store;
+
+grpc_credentials_md_store *grpc_credentials_md_store_create(
+ size_t initial_capacity);
+
+/* Will ref key and value. */
+void grpc_credentials_md_store_add(grpc_credentials_md_store *store,
+ gpr_slice key, gpr_slice value);
+void grpc_credentials_md_store_add_cstrings(grpc_credentials_md_store *store,
+ const char *key, const char *value);
+grpc_credentials_md_store *grpc_credentials_md_store_ref(
+ grpc_credentials_md_store *store);
+void grpc_credentials_md_store_unref(grpc_credentials_md_store *store);
+
+
/* --- grpc_credentials. --- */
/* It is the caller's responsibility to gpr_free the result if not NULL. */
char *grpc_get_well_known_google_credentials_file_path(void);
typedef void (*grpc_credentials_metadata_cb)(void *user_data,
- grpc_mdelem **md_elems,
+ grpc_credentials_md *md_elems,
size_t num_md,
grpc_credentials_status status);
@@ -96,7 +123,6 @@ typedef struct {
void (*destroy)(grpc_credentials *c);
int (*has_request_metadata)(const grpc_credentials *c);
int (*has_request_metadata_only)(const grpc_credentials *c);
- grpc_mdctx *(*get_metadata_context)(grpc_credentials *c);
void (*get_request_metadata)(grpc_credentials *c,
const char *service_url,
grpc_credentials_metadata_cb cb,
@@ -123,11 +149,6 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds,
grpc_credentials_metadata_cb cb,
void *user_data);
-/* Gets the mdctx from the credentials and increase the refcount if it exists,
- otherwise, create a new one. */
-grpc_mdctx *grpc_credentials_get_or_create_metadata_context(
- grpc_credentials *creds);
-
/* Creates a security connector for the channel. May also create new channel
args for the channel to be used in place of the passed in const args if
returned non NULL. In that case the caller is responsible for destroying
@@ -155,9 +176,9 @@ grpc_credentials *grpc_credentials_contains_type(
/* Exposed for testing only. */
grpc_credentials_status
- grpc_oauth2_token_fetcher_credentials_parse_server_response(
- const struct grpc_httpcli_response *response, grpc_mdctx *ctx,
- grpc_mdelem **token_elem, gpr_timespec *token_lifetime);
+grpc_oauth2_token_fetcher_credentials_parse_server_response(
+ const struct grpc_httpcli_response *response, grpc_credentials_md_store **token_md,
+ gpr_timespec *token_lifetime);
/* Simulates an oauth2 token fetch with the specified value for testing. */
grpc_credentials *grpc_fake_oauth2_credentials_create(
diff --git a/src/core/security/credentials_metadata.c b/src/core/security/credentials_metadata.c
new file mode 100644
index 0000000000..22c786be56
--- /dev/null
+++ b/src/core/security/credentials_metadata.c
@@ -0,0 +1,101 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/security/credentials.h"
+
+#include <grpc/support/alloc.h>
+
+#include <string.h>
+
+static void store_ensure_capacity(grpc_credentials_md_store *store) {
+ if (store->num_entries == store->allocated) {
+ store->allocated = (store->allocated == 0) ? 1 : store->allocated * 2;
+ store->entries = gpr_realloc(
+ store->entries, store->allocated * sizeof(grpc_credentials_md));
+ }
+}
+
+grpc_credentials_md_store *grpc_credentials_md_store_create(
+ size_t initial_capacity) {
+ grpc_credentials_md_store *store = gpr_malloc(sizeof(grpc_credentials_md_store));
+ memset(store, 0, sizeof(grpc_credentials_md_store));
+ if (initial_capacity > 0) {
+ store->entries = gpr_malloc(initial_capacity * sizeof(grpc_credentials_md));
+ store->allocated = initial_capacity;
+ }
+ gpr_ref_init(&store->refcount, 1);
+ return store;
+}
+
+void grpc_credentials_md_store_add(grpc_credentials_md_store *store,
+ gpr_slice key, gpr_slice value) {
+ if (store == NULL) return;
+ store_ensure_capacity(store);
+ store->entries[store->num_entries].key = gpr_slice_ref(key);
+ store->entries[store->num_entries].value = gpr_slice_ref(value);
+ store->num_entries++;
+}
+
+void grpc_credentials_md_store_add_cstrings(grpc_credentials_md_store *store,
+ const char *key,
+ const char *value) {
+ if (store == NULL) return;
+ store_ensure_capacity(store);
+ store->entries[store->num_entries].key = gpr_slice_from_copied_string(key);
+ store->entries[store->num_entries].value =
+ gpr_slice_from_copied_string(value);
+ store->num_entries++;
+}
+
+grpc_credentials_md_store *grpc_credentials_md_store_ref(
+ grpc_credentials_md_store *store) {
+ if (store == NULL) return NULL;
+ gpr_ref(&store->refcount);
+ return store;
+}
+
+void grpc_credentials_md_store_unref(grpc_credentials_md_store *store) {
+ if (store == NULL) return;
+ if (gpr_unref(&store->refcount)) {
+ if (store->entries != NULL) {
+ size_t i;
+ for (i = 0; i < store->num_entries; i++) {
+ gpr_slice_unref(store->entries[i].key);
+ gpr_slice_unref(store->entries[i].value);
+ }
+ gpr_free(store->entries);
+ }
+ gpr_free(store);
+ }
+}
+
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index 61cb20f6b9..11505f8cb0 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -37,6 +37,7 @@
#include "src/core/security/credentials.h"
#include "src/core/security/secure_endpoint.h"
+#include "src/core/security/security_context.h"
#include "src/core/support/env.h"
#include "src/core/support/file.h"
#include "src/core/support/string.h"
@@ -82,7 +83,7 @@ static const char *ssl_cipher_suites(void) {
/* -- Common methods. -- */
/* Returns the first property with that name. */
-static const tsi_peer_property *tsi_peer_get_property_by_name(
+const tsi_peer_property *tsi_peer_get_property_by_name(
const tsi_peer *peer, const char *name) {
size_t i;
if (peer == NULL) return NULL;
@@ -194,10 +195,14 @@ typedef struct {
static void fake_channel_destroy(grpc_security_connector *sc) {
grpc_channel_security_connector *c = (grpc_channel_security_connector *)sc;
grpc_credentials_unref(c->request_metadata_creds);
+ grpc_auth_context_unref(sc->auth_context);
gpr_free(sc);
}
-static void fake_server_destroy(grpc_security_connector *sc) { gpr_free(sc); }
+static void fake_server_destroy(grpc_security_connector *sc) {
+ grpc_auth_context_unref(sc->auth_context);
+ gpr_free(sc);
+}
static grpc_security_status fake_channel_create_handshaker(
grpc_security_connector *sc, tsi_handshaker **handshaker) {
@@ -236,6 +241,12 @@ static grpc_security_status fake_check_peer(grpc_security_connector *sc,
status = GRPC_SECURITY_ERROR;
goto end;
}
+ grpc_auth_context_unref(sc->auth_context);
+ sc->auth_context = grpc_auth_context_create(NULL, 1);
+ sc->auth_context->properties[0] = grpc_auth_property_init_from_cstring(
+ GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
+ GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
+
end:
tsi_peer_destruct(&peer);
return status;
@@ -264,6 +275,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
grpc_credentials *request_metadata_creds, int call_host_check_is_async) {
grpc_fake_channel_security_connector *c =
gpr_malloc(sizeof(grpc_fake_channel_security_connector));
+ memset(c, 0, sizeof(grpc_fake_channel_security_connector));
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.is_client_side = 1;
c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
@@ -277,7 +289,9 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
grpc_security_connector *grpc_fake_server_security_connector_create(void) {
grpc_security_connector *c = gpr_malloc(sizeof(grpc_security_connector));
+ memset(c, 0, sizeof(grpc_security_connector));
gpr_ref_init(&c->refcount, 1);
+ c->is_client_side = 0;
c->vtable = &fake_server_vtable;
c->url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
return c;
@@ -308,6 +322,7 @@ static void ssl_channel_destroy(grpc_security_connector *sc) {
if (c->target_name != NULL) gpr_free(c->target_name);
if (c->overridden_target_name != NULL) gpr_free(c->overridden_target_name);
tsi_peer_destruct(&c->peer);
+ grpc_auth_context_unref(sc->auth_context);
gpr_free(sc);
}
@@ -317,6 +332,7 @@ static void ssl_server_destroy(grpc_security_connector *sc) {
if (c->handshaker_factory != NULL) {
tsi_ssl_handshaker_factory_destroy(c->handshaker_factory);
}
+ grpc_auth_context_unref(sc->auth_context);
gpr_free(sc);
}
@@ -369,7 +385,51 @@ static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) {
return r;
}
-static grpc_security_status ssl_check_peer(const char *peer_name,
+static grpc_auth_context *tsi_ssl_peer_to_auth_context(const tsi_peer *peer) {
+ /* We bet that iterating over a handful of properties twice will be faster
+ than having to realloc on average . */
+ size_t auth_prop_count = 1; /* for transport_security_type. */
+ size_t i;
+ const char *peer_identity_property_name = NULL;
+ grpc_auth_context *ctx = NULL;
+ for (i = 0; i < peer->property_count; i++) {
+ const tsi_peer_property *prop = &peer->properties[i];
+ if (prop->name == NULL) continue;
+ if (strcmp(prop->name, TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY) == 0) {
+ auth_prop_count++;
+ /* If there is no subject alt name, have the CN as the identity. */
+ if (peer_identity_property_name == NULL) {
+ peer_identity_property_name = prop->name;
+ }
+ } else if (strcmp(prop->name,
+ TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY) == 0) {
+ auth_prop_count++;
+ peer_identity_property_name = prop->name;
+ }
+ }
+ ctx = grpc_auth_context_create(NULL, auth_prop_count);
+ ctx->properties[0] = grpc_auth_property_init_from_cstring(
+ GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
+ GRPC_SSL_TRANSPORT_SECURITY_TYPE);
+ ctx->property_count = 1;
+ for (i = 0; i < peer->property_count; i++) {
+ const tsi_peer_property *prop = &peer->properties[i];
+ if (prop->name == NULL) continue;
+ if (strcmp(prop->name, TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY) == 0) {
+ ctx->properties[ctx->property_count++] = grpc_auth_property_init(
+ GRPC_X509_CN_PROPERTY_NAME, prop->value.data, prop->value.length);
+ } else if (strcmp(prop->name,
+ TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY) == 0) {
+ ctx->properties[ctx->property_count++] = grpc_auth_property_init(
+ GRPC_X509_SAN_PROPERTY_NAME, prop->value.data, prop->value.length);
+ }
+ }
+ GPR_ASSERT(auth_prop_count == ctx->property_count);
+ return ctx;
+}
+
+static grpc_security_status ssl_check_peer(grpc_security_connector *sc,
+ const char *peer_name,
const tsi_peer *peer) {
/* Check the ALPN. */
const tsi_peer_property *p =
@@ -388,7 +448,7 @@ static grpc_security_status ssl_check_peer(const char *peer_name,
gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate", peer_name);
return GRPC_SECURITY_ERROR;
}
-
+ sc->auth_context = tsi_ssl_peer_to_auth_context(peer);
return GRPC_SECURITY_OK;
}
@@ -401,9 +461,9 @@ static grpc_security_status ssl_channel_check_peer(grpc_security_connector *sc,
grpc_security_status status;
tsi_peer_destruct(&c->peer);
c->peer = peer;
- status = ssl_check_peer(c->overridden_target_name != NULL
- ? c->overridden_target_name
- : c->target_name,
+ status = ssl_check_peer(sc, c->overridden_target_name != NULL
+ ? c->overridden_target_name
+ : c->target_name,
&peer);
return status;
}
@@ -412,8 +472,7 @@ static grpc_security_status ssl_server_check_peer(grpc_security_connector *sc,
tsi_peer peer,
grpc_security_check_cb cb,
void *user_data) {
- /* TODO(jboeuf): Find a way to expose the peer to the authorization layer. */
- grpc_security_status status = ssl_check_peer(NULL, &peer);
+ grpc_security_status status = ssl_check_peer(sc, NULL, &peer);
tsi_peer_destruct(&peer);
return status;
}
diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h
index 47abe05cff..0617041448 100644
--- a/src/core/security/security_connector.h
+++ b/src/core/security/security_connector.h
@@ -77,6 +77,7 @@ struct grpc_security_connector {
gpr_refcount refcount;
int is_client_side;
const char *url_scheme;
+ grpc_auth_context *auth_context; /* Populated after the peer is checked. */
};
/* Increments the refcount. */
@@ -198,4 +199,8 @@ typedef struct {
grpc_security_status grpc_ssl_server_security_connector_create(
const grpc_ssl_server_config *config, grpc_security_connector **sc);
+/* Util. */
+const tsi_peer_property *tsi_peer_get_property_by_name(
+ const tsi_peer *peer, const char *name);
+
#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONNECTOR_H */
diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c
index b90dc5097a..14c194c8f6 100644
--- a/src/core/security/security_context.c
+++ b/src/core/security/security_context.c
@@ -35,11 +35,14 @@
#include "src/core/security/security_context.h"
#include "src/core/surface/call.h"
+#include "src/core/support/string.h"
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+/* --- grpc_call --- */
+
grpc_call_error grpc_call_set_credentials(grpc_call *call,
grpc_credentials *creds) {
grpc_client_security_context *ctx = NULL;
@@ -65,6 +68,16 @@ grpc_call_error grpc_call_set_credentials(grpc_call *call,
return GRPC_CALL_OK;
}
+const grpc_auth_context *grpc_call_auth_context(grpc_call *call) {
+ void *sec_ctx = grpc_call_context_get(call, GRPC_CONTEXT_SECURITY);
+ if (sec_ctx == NULL) return NULL;
+ return grpc_call_is_client(call)
+ ? ((grpc_client_security_context *)sec_ctx)->auth_context
+ : ((grpc_server_security_context *)sec_ctx)->auth_context;
+}
+
+/* --- grpc_client_security_context --- */
+
grpc_client_security_context *grpc_client_security_context_create(void) {
grpc_client_security_context *ctx =
gpr_malloc(sizeof(grpc_client_security_context));
@@ -75,5 +88,142 @@ grpc_client_security_context *grpc_client_security_context_create(void) {
void grpc_client_security_context_destroy(void *ctx) {
grpc_client_security_context *c = (grpc_client_security_context *)ctx;
grpc_credentials_unref(c->creds);
+ grpc_auth_context_unref(c->auth_context);
+ gpr_free(ctx);
+}
+
+/* --- grpc_server_security_context --- */
+
+grpc_server_security_context *grpc_server_security_context_create(void) {
+ grpc_server_security_context *ctx =
+ gpr_malloc(sizeof(grpc_server_security_context));
+ memset(ctx, 0, sizeof(grpc_server_security_context));
+ return ctx;
+}
+
+void grpc_server_security_context_destroy(void *ctx) {
+ grpc_server_security_context *c = (grpc_server_security_context *)ctx;
+ grpc_auth_context_unref(c->auth_context);
gpr_free(ctx);
}
+
+/* --- grpc_auth_context --- */
+
+static grpc_auth_property_iterator empty_iterator = {NULL, 0, NULL};
+
+grpc_auth_context *grpc_auth_context_create(grpc_auth_context *chained,
+ size_t property_count) {
+ grpc_auth_context *ctx = gpr_malloc(sizeof(grpc_auth_context));
+ memset(ctx, 0, sizeof(grpc_auth_context));
+ ctx->properties = gpr_malloc(property_count * sizeof(grpc_auth_property));
+ memset(ctx->properties, 0, property_count * sizeof(grpc_auth_property));
+ ctx->property_count = property_count;
+ gpr_ref_init(&ctx->refcount, 1);
+ if (chained != NULL) ctx->chained = grpc_auth_context_ref(chained);
+ return ctx;
+}
+
+grpc_auth_context *grpc_auth_context_ref(grpc_auth_context *ctx) {
+ if (ctx == NULL) return NULL;
+ gpr_ref(&ctx->refcount);
+ return ctx;
+}
+
+void grpc_auth_context_unref(grpc_auth_context *ctx) {
+ if (ctx == NULL) return;
+ if (gpr_unref(&ctx->refcount)) {
+ size_t i;
+ grpc_auth_context_unref(ctx->chained);
+ if (ctx->properties != NULL) {
+ for (i = 0; i < ctx->property_count; i++) {
+ grpc_auth_property_reset(&ctx->properties[i]);
+ }
+ gpr_free(ctx->properties);
+ }
+ gpr_free(ctx);
+ }
+}
+
+const char *grpc_auth_context_peer_identity_property_name(
+ const grpc_auth_context *ctx) {
+ return ctx->peer_identity_property_name;
+}
+
+int grpc_auth_context_peer_is_authenticated(
+ const grpc_auth_context *ctx) {
+ return ctx->peer_identity_property_name == NULL ? 0 : 1;
+}
+
+grpc_auth_property_iterator grpc_auth_context_property_iterator(
+ const grpc_auth_context *ctx) {
+ grpc_auth_property_iterator it = empty_iterator;
+ if (ctx == NULL) return it;
+ it.ctx = ctx;
+ return it;
+}
+
+const grpc_auth_property *grpc_auth_property_iterator_next(
+ grpc_auth_property_iterator *it) {
+ if (it == NULL || it->ctx == NULL) return NULL;
+ while (it->index == it->ctx->property_count) {
+ if (it->ctx->chained == NULL) return NULL;
+ it->ctx = it->ctx->chained;
+ it->index = 0;
+ }
+ if (it->name == NULL) {
+ return &it->ctx->properties[it->index++];
+ } else {
+ while (it->index < it->ctx->property_count) {
+ const grpc_auth_property *prop = &it->ctx->properties[it->index++];
+ GPR_ASSERT(prop->name != NULL);
+ if (strcmp(it->name, prop->name) == 0) {
+ return prop;
+ }
+ }
+ /* We could not find the name, try another round. */
+ return grpc_auth_property_iterator_next(it);
+ }
+}
+
+grpc_auth_property_iterator grpc_auth_context_find_properties_by_name(
+ const grpc_auth_context *ctx, const char *name) {
+ grpc_auth_property_iterator it = empty_iterator;
+ if (ctx == NULL || name == NULL) return empty_iterator;
+ it.ctx = ctx;
+ it.name = name;
+ return it;
+}
+
+grpc_auth_property_iterator grpc_auth_context_peer_identity(
+ const grpc_auth_context *ctx) {
+ if (ctx == NULL) return empty_iterator;
+ return grpc_auth_context_find_properties_by_name(
+ ctx, ctx->peer_identity_property_name);
+}
+
+grpc_auth_property grpc_auth_property_init_from_cstring(const char *name,
+ const char *value) {
+ grpc_auth_property prop;
+ prop.name = gpr_strdup(name);
+ prop.value = gpr_strdup(value);
+ prop.value_length = strlen(value);
+ return prop;
+}
+
+grpc_auth_property grpc_auth_property_init(const char *name, const char *value,
+ size_t value_length) {
+ grpc_auth_property prop;
+ prop.name = gpr_strdup(name);
+ prop.value = gpr_malloc(value_length + 1);
+ memcpy(prop.value, value, value_length);
+ prop.value[value_length] = '\0';
+ prop.value_length = value_length;
+ return prop;
+}
+
+void grpc_auth_property_reset(grpc_auth_property *property) {
+ if (property->name != NULL) gpr_free(property->name);
+ if (property->value != NULL) gpr_free(property->value);
+ memset(property, 0, sizeof(grpc_auth_property));
+}
+
diff --git a/src/core/security/security_context.h b/src/core/security/security_context.h
index 561633b452..d8909cd6f1 100644
--- a/src/core/security/security_context.h
+++ b/src/core/security/security_context.h
@@ -36,13 +36,59 @@
#include "src/core/security/credentials.h"
-/* Security context attached to a client-side call. */
+/* --- grpc_auth_context ---
+
+ High level authentication context object. Can optionally be chained. */
+
+/* Property names are always NULL terminated. */
+
+struct grpc_auth_context {
+ struct grpc_auth_context *chained;
+ grpc_auth_property *properties;
+ size_t property_count;
+ gpr_refcount refcount;
+ const char *peer_identity_property_name;
+};
+
+/* Constructor. */
+grpc_auth_context *grpc_auth_context_create(grpc_auth_context *chained,
+ size_t property_count);
+
+/* Refcounting. */
+grpc_auth_context *grpc_auth_context_ref(
+ grpc_auth_context *ctx);
+void grpc_auth_context_unref(grpc_auth_context *ctx);
+
+grpc_auth_property grpc_auth_property_init_from_cstring(const char *name,
+ const char *value);
+
+grpc_auth_property grpc_auth_property_init(const char *name, const char *value,
+ size_t value_length);
+
+void grpc_auth_property_reset(grpc_auth_property *property);
+
+/* --- grpc_client_security_context ---
+
+ Internal client-side security context. */
+
typedef struct {
grpc_credentials *creds;
+ grpc_auth_context *auth_context;
} grpc_client_security_context;
grpc_client_security_context *grpc_client_security_context_create(void);
void grpc_client_security_context_destroy(void *ctx);
+/* --- grpc_server_security_context ---
+
+ Internal server-side security context. */
+
+typedef struct {
+ grpc_auth_context *auth_context;
+} grpc_server_security_context;
+
+grpc_server_security_context *grpc_server_security_context_create(void);
+void grpc_server_security_context_destroy(void *ctx);
+
#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
new file mode 100644
index 0000000000..1823f75808
--- /dev/null
+++ b/src/core/security/server_auth_filter.c
@@ -0,0 +1,128 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/security/auth_filters.h"
+#include "src/core/security/security_connector.h"
+#include "src/core/security/security_context.h"
+
+#include <grpc/support/log.h>
+
+typedef struct call_data {
+ int unused; /* C89 requires at least one struct element */
+} call_data;
+
+typedef struct channel_data {
+ grpc_security_connector *security_connector;
+} channel_data;
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void auth_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ /* TODO(jboeuf): Get the metadata and get a new context from it. */
+
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op);
+}
+
+/* Called on special channel events, such as disconnection or new incoming
+ calls on the server */
+static void channel_op(grpc_channel_element *elem,
+ grpc_channel_element *from_elem, grpc_channel_op *op) {
+ grpc_channel_next_op(elem, op);
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_server_security_context *server_ctx = NULL;
+
+ /* initialize members */
+ calld->unused = 0;
+
+ GPR_ASSERT(initial_op && initial_op->context != NULL &&
+ chand->security_connector->auth_context != NULL &&
+ initial_op->context[GRPC_CONTEXT_SECURITY].value == NULL);
+
+ /* Create a security context for the call and reference the auth context from
+ the channel. */
+ server_ctx = grpc_server_security_context_create();
+ server_ctx->auth_context =
+ grpc_auth_context_ref(chand->security_connector->auth_context);
+ initial_op->context[GRPC_CONTEXT_SECURITY].value = server_ctx;
+ initial_op->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_server_security_context_destroy;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ grpc_security_connector *sc = grpc_find_security_connector_in_args(args);
+ /* grab pointers to our data from the channel element */
+ channel_data *chand = elem->channel_data;
+
+ /* The first and the last filters tend to be implemented differently to
+ handle the case that there's no 'next' filter to call on the up or down
+ path */
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
+ GPR_ASSERT(sc != NULL);
+
+ /* initialize members */
+ GPR_ASSERT(!sc->is_client_side);
+ chand->security_connector = grpc_security_connector_ref(sc);
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ /* grab pointers to our data from the channel element */
+ channel_data *chand = elem->channel_data;
+ grpc_security_connector_unref(chand->security_connector);
+}
+
+const grpc_channel_filter grpc_server_auth_filter = {
+ auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "server-auth"};
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index db9d545c0e..3519930f38 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -35,10 +35,12 @@
#include <string.h>
+#include "src/core/channel/channel_args.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_server.h"
+#include "src/core/security/auth_filters.h"
#include "src/core/security/credentials.h"
#include "src/core/security/security_connector.h"
#include "src/core/security/secure_transport_setup.h"
@@ -69,13 +71,21 @@ static void state_unref(grpc_server_secure_state *state) {
}
}
-static grpc_transport_setup_result setup_transport(void *server,
+static grpc_transport_setup_result setup_transport(void *statep,
grpc_transport *transport,
grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = {
- &grpc_http_server_filter};
- return grpc_server_setup_transport(server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx);
+ &grpc_server_auth_filter, &grpc_http_server_filter};
+ grpc_server_secure_state *state = statep;
+ grpc_transport_setup_result result;
+ grpc_arg connector_arg = grpc_security_connector_to_arg(state->sc);
+ grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
+ grpc_server_get_channel_args(state->server), &connector_arg);
+ result = grpc_server_setup_transport(state->server, transport, extra_filters,
+ GPR_ARRAY_SIZE(extra_filters), mdctx,
+ args_copy);
+ grpc_channel_args_destroy(args_copy);
+ return result;
}
static void on_secure_transport_setup_done(void *statep,
@@ -85,10 +95,9 @@ static void on_secure_transport_setup_done(void *statep,
if (status == GRPC_SECURITY_OK) {
gpr_mu_lock(&state->mu);
if (!state->is_shutdown) {
- grpc_create_chttp2_transport(setup_transport, state->server,
- grpc_server_get_channel_args(state->server),
- secure_endpoint, NULL, 0,
- grpc_mdctx_create(), 0);
+ grpc_create_chttp2_transport(
+ setup_transport, state, grpc_server_get_channel_args(state->server),
+ secure_endpoint, NULL, 0, grpc_mdctx_create(), 0);
} else {
/* We need to consume this here, because the server may already have gone
* away. */
diff --git a/src/core/support/cmdline.c b/src/core/support/cmdline.c
index 72f46c1bd7..530952c437 100644
--- a/src/core/support/cmdline.c
+++ b/src/core/support/cmdline.c
@@ -131,33 +131,63 @@ void gpr_cmdline_on_extra_arg(
cl->extra_arg_help = help;
}
-static void print_usage_and_die(gpr_cmdline *cl) {
+/* recursively descend argument list, adding the last element
+ to s first - so that arguments are added in the order they were
+ added to the list by api calls */
+static void add_args_to_usage(gpr_strvec *s, arg *a) {
+ char *tmp;
+
+ if (!a) return;
+ add_args_to_usage(s, a->next);
+
+ switch (a->type) {
+ case ARGTYPE_BOOL:
+ gpr_asprintf(&tmp, " [--%s|--no-%s]", a->name, a->name);
+ gpr_strvec_add(s, tmp);
+ break;
+ case ARGTYPE_STRING:
+ gpr_asprintf(&tmp, " [--%s=string]", a->name);
+ gpr_strvec_add(s, tmp);
+ break;
+ case ARGTYPE_INT:
+ gpr_asprintf(&tmp, " [--%s=int]", a->name);
+ gpr_strvec_add(s, tmp);
+ break;
+ }
+}
+
+char *gpr_cmdline_usage_string(gpr_cmdline *cl, const char *argv0) {
/* TODO(ctiller): make this prettier */
- arg *a;
- const char *name = strrchr(cl->argv0, '/');
+ gpr_strvec s;
+ char *tmp;
+ const char *name = strrchr(argv0, '/');
+
if (name) {
name++;
} else {
- name = cl->argv0;
- }
- fprintf(stderr, "Usage: %s", name);
- for (a = cl->args; a; a = a->next) {
- switch (a->type) {
- case ARGTYPE_BOOL:
- fprintf(stderr, " [--%s|--no-%s]", a->name, a->name);
- break;
- case ARGTYPE_STRING:
- fprintf(stderr, " [--%s=string]", a->name);
- break;
- case ARGTYPE_INT:
- fprintf(stderr, " [--%s=int]", a->name);
- break;
- }
+ name = argv0;
}
+
+ gpr_strvec_init(&s);
+
+ gpr_asprintf(&tmp, "Usage: %s", name);
+ gpr_strvec_add(&s, tmp);
+ add_args_to_usage(&s, cl->args);
if (cl->extra_arg) {
- fprintf(stderr, " [%s...]", cl->extra_arg_name);
+ gpr_asprintf(&tmp, " [%s...]", cl->extra_arg_name);
+ gpr_strvec_add(&s, tmp);
}
- fprintf(stderr, "\n");
+ gpr_strvec_add(&s, gpr_strdup("\n"));
+
+ tmp = gpr_strvec_flatten(&s, NULL);
+ gpr_strvec_destroy(&s);
+ return tmp;
+}
+
+static void print_usage_and_die(gpr_cmdline *cl) {
+ char *usage = gpr_cmdline_usage_string(cl, cl->argv0);
+ fprintf(stderr, "%s", usage);
+ gpr_free(usage);
exit(1);
}
diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c
index 4580537aa8..b4631fa0ed 100644
--- a/src/core/support/subprocess_posix.c
+++ b/src/core/support/subprocess_posix.c
@@ -55,9 +55,9 @@ struct gpr_subprocess {
int joined;
};
-char *gpr_subprocess_binary_extension() { return ""; }
+const char *gpr_subprocess_binary_extension() { return ""; }
-gpr_subprocess *gpr_subprocess_create(int argc, char **argv) {
+gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) {
gpr_subprocess *r;
int pid;
char **exec_args;
@@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) {
int gpr_subprocess_join(gpr_subprocess *p) {
int status;
+retry:
if (waitpid(p->pid, &status, 0) == -1) {
+ if (errno == EINTR) {
+ goto retry;
+ }
gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
return -1;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index b0b922b642..7b28560a1d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -206,8 +206,8 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- void *context[GRPC_CONTEXT_COUNT];
- void (*destroy_context[GRPC_CONTEXT_COUNT])(void *);
+ /* Contexts for various subsystems (security, tracing, ...). */
+ grpc_call_context_element context[GRPC_CONTEXT_COUNT];
/* Deadline alarm - if have_alarm is non-zero */
grpc_alarm alarm;
@@ -269,8 +269,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
if (call->is_client) {
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
- call->context[GRPC_CONTEXT_TRACING] = grpc_census_context_create();
- call->destroy_context[GRPC_CONTEXT_TRACING] = grpc_census_context_destroy;
+ call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create();
+ call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy;
}
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
for (i = 0; i < add_initial_metadata_count; i++) {
@@ -347,8 +347,8 @@ static void destroy_call(void *call, int ignored_success) {
grpc_mdelem_unref(c->send_initial_metadata[i].md);
}
for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
- if (c->destroy_context[i]) {
- c->destroy_context[i](c->context[i]);
+ if (c->context[i].destroy) {
+ c->context[i].destroy(c->context[i].value);
}
}
grpc_sopb_destroy(&c->send_ops);
@@ -404,6 +404,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
@@ -412,8 +413,7 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
- (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
- call->read_state < READ_STATE_GOT_INITIAL_METADATA);
+ (call->write_state == WRITE_STATE_INITIAL && !call->is_client);
}
static void unlock(grpc_call *call) {
@@ -540,9 +540,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
- if (master->success) {
- call->request_set[i] = REQSET_EMPTY;
- } else {
+ call->request_set[i] = REQSET_EMPTY;
+ if (!master->success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
@@ -587,11 +586,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
}
}
+static void early_out_write_ops(grpc_call *call) {
+ switch (call->write_state) {
+ case WRITE_STATE_WRITE_CLOSED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ /* fallthrough */
+ case WRITE_STATE_STARTED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
+ /* fallthrough */
+ case WRITE_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
+ call->write_state = WRITE_STATE_STARTED;
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
@@ -600,7 +617,13 @@ static void call_on_done_send(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ }
+ if (!success) {
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ early_out_write_ops(call);
}
+ call->send_ops.nops = 0;
call->last_send_contains = 0;
call->sending = 0;
unlock(call);
@@ -813,7 +836,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->send_ops = &call->send_ops;
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
- call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
/* fall through intended */
case WRITE_STATE_STARTED:
@@ -829,7 +851,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->is_last_send = 1;
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
- call->write_state = WRITE_STATE_WRITE_CLOSED;
if (!call->is_client) {
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
@@ -921,23 +942,6 @@ static void finish_read_ops(grpc_call *call) {
}
}
-static void early_out_write_ops(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_WRITE_CLOSED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
- /* fallthrough */
- case WRITE_STATE_STARTED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
- /* fallthrough */
- case WRITE_STATE_INITIAL:
- /* do nothing */
- break;
- }
-}
-
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
@@ -1178,6 +1182,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
+ grpc_cq_end_op(call->cq, tag, call, success);
+}
+
+static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
@@ -1188,6 +1196,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
+ void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@@ -1271,6 +1280,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_status_on_client.trailing_metadata;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
req = &reqs[out++];
@@ -1280,27 +1290,27 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
}
}
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
- tag);
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
void *value, void (*destroy)(void *value)) {
- if (call->destroy_context[elem]) {
- call->destroy_context[elem](value);
+ if (call->context[elem].destroy) {
+ call->context[elem].destroy(call->context[elem].value);
}
- call->context[elem] = value;
- call->destroy_context[elem] = destroy;
+ call->context[elem].value = value;
+ call->context[elem].destroy = destroy;
}
void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
- return call->context[elem];
+ return call->context[elem].value;
}
gpr_uint8 grpc_call_is_client(grpc_call *call) { return call->is_client; }
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 60222bf389..9116538948 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -122,6 +122,16 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag);
+void grpc_server_log_request_call(char *file, int line,
+ gpr_log_severity severity,
+ grpc_server *server,
+ grpc_call **call,
+ grpc_call_details *details,
+ grpc_metadata_array *initial_metadata,
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification,
+ void *tag);
+
/* Set a context pointer.
No thread safety guarantees are made wrt this value. */
void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value,
@@ -132,6 +142,9 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \
if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag)
+#define GRPC_SERVER_LOG_REQUEST_CALL(sev, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notifications, tag) \
+ if (grpc_trace_batch) grpc_server_log_request_call(sev, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notifications, tag)
+
gpr_uint8 grpc_call_is_client(grpc_call *call);
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c
index e3b3f75b45..9905401bee 100644
--- a/src/core/surface/call_log_batch.c
+++ b/src/core/surface/call_log_batch.c
@@ -119,3 +119,19 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
gpr_free(tmp);
}
}
+
+void grpc_server_log_request_call(char *file, int line,
+ gpr_log_severity severity,
+ grpc_server *server,
+ grpc_call **call,
+ grpc_call_details *details,
+ grpc_metadata_array *initial_metadata,
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification,
+ void *tag) {
+ gpr_log(file, line, severity,
+ "grpc_server_request_call(server=%p, call=%p, details=%p, "
+ "initial_metadata=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
+ "tag=%p)", server, call, details, initial_metadata,
+ cq_bound_to_call, cq_for_notification, tag);
+}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 48910afda3..8c9ca48a05 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -275,14 +275,14 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
return ret;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret = ev->base;
gpr_free(ev);
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
return ret;
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 3186292a02..a3b0b2672b 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -55,6 +55,7 @@ static void lame_start_transport_op(grpc_call_element *elem,
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops) {
+ grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send(op->send_user_data, 0);
}
if (op->recv_ops) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 23156c981e..8ef121dc48 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -46,7 +46,7 @@
#include "src/core/channel/http_client_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
-#include "src/core/security/auth.h"
+#include "src/core/security/auth_filters.h"
#include "src/core/security/credentials.h"
#include "src/core/security/secure_transport_setup.h"
#include "src/core/support/string.h"
@@ -226,7 +226,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
GRPC_SECURITY_OK) {
return grpc_lame_client_channel_create();
}
- mdctx = grpc_credentials_get_or_create_metadata_context(creds);
+ mdctx = grpc_mdctx_create();
s = gpr_malloc(sizeof(setup));
connector_arg = grpc_security_connector_to_arg(&connector->base);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 735b8ac4b0..b619bda056 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_add_callback(kill_zombie, elem);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -672,7 +674,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
registered_method *m;
if (!method) {
- gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
+ gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL");
return NULL;
}
for (m = server->registered_methods; m; m = m->next) {
@@ -708,7 +710,7 @@ void grpc_server_start(grpc_server *server) {
grpc_transport_setup_result grpc_server_setup_transport(
grpc_server *s, grpc_transport *transport,
grpc_channel_filter const **extra_filters, size_t num_extra_filters,
- grpc_mdctx *mdctx) {
+ grpc_mdctx *mdctx, const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
@@ -739,8 +741,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
}
- channel = grpc_channel_create_from_filters(filters, num_filters,
- s->channel_args, mdctx, 0);
+ channel =
+ grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
@@ -1017,6 +1019,9 @@ grpc_call_error grpc_server_request_call(
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
+ GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
+ initial_metadata, cq_bound_to_call,
+ cq_for_notification, tag);
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
@@ -1135,3 +1140,12 @@ static void publish_registered_or_batch(grpc_call *call, int success,
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
return server->channel_args;
}
+
+int grpc_server_has_open_connections(grpc_server *server) {
+ int r;
+ gpr_mu_lock(&server->mu);
+ r = server->root_channel_data.next != &server->root_channel_data;
+ gpr_mu_unlock(&server->mu);
+ return r;
+}
+
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index c6331033e0..91a1a2a7f6 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -58,8 +58,10 @@ void grpc_server_listener_destroy_done(void *server);
grpc_transport_setup_result grpc_server_setup_transport(
grpc_server *server, grpc_transport *transport,
grpc_channel_filter const **extra_filters, size_t num_extra_filters,
- grpc_mdctx *mdctx);
+ grpc_mdctx *mdctx, const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
+int grpc_server_has_open_connections(grpc_server *server);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_SERVER_H */
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 7b5c2f227b..7e49a531df 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -48,7 +48,8 @@ static grpc_transport_setup_result setup_transport(void *server,
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx);
+ GPR_ARRAY_SIZE(extra_filters), mdctx,
+ grpc_server_get_channel_args(server));
}
static void new_transport(void *server, grpc_endpoint *tcp) {
diff --git a/src/core/transport/chttp2/alpn.c b/src/core/transport/chttp2/alpn.c
index 11901a58a0..3ccd5796ba 100644
--- a/src/core/transport/chttp2/alpn.c
+++ b/src/core/transport/chttp2/alpn.c
@@ -36,7 +36,8 @@
#include <grpc/support/useful.h>
/* in order of preference */
-static const char *const supported_versions[] = {"h2-16", "h2-15", "h2-14"};
+static const char *const supported_versions[] = {"h2", "h2-17", "h2-16",
+ "h2-15", "h2-14"};
int grpc_chttp2_is_alpn_version_supported(const char *version, size_t size) {
size_t i;
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h
index ac76c4cc9c..c9e3e13042 100644
--- a/src/core/transport/chttp2/frame.h
+++ b/src/core/transport/chttp2/frame.h
@@ -53,12 +53,14 @@ typedef struct {
gpr_uint8 send_ping_ack;
gpr_uint8 process_ping_reply;
gpr_uint8 goaway;
+ gpr_uint8 rst_stream;
gpr_int64 initial_window_update;
gpr_uint32 window_update;
gpr_uint32 goaway_last_stream_index;
gpr_uint32 goaway_error;
gpr_slice goaway_text;
+ gpr_uint32 rst_stream_reason;
} grpc_chttp2_parse_state;
#define GRPC_CHTTP2_FRAME_DATA 0
diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c
index 368ca86481..3016aac7a2 100644
--- a/src/core/transport/chttp2/frame_rst_stream.c
+++ b/src/core/transport/chttp2/frame_rst_stream.c
@@ -32,6 +32,9 @@
*/
#include "src/core/transport/chttp2/frame_rst_stream.h"
+
+#include <grpc/support/log.h>
+
#include "src/core/transport/chttp2/frame.h"
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
@@ -54,3 +57,40 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
return slice;
}
+
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+ grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) {
+ if (length != 4) {
+ gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+ parser->byte = 0;
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+ void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ int is_last) {
+ gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
+ gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
+ gpr_uint8 *cur = beg;
+ grpc_chttp2_rst_stream_parser *p = parser;
+
+ while (p->byte != 4 && cur != end) {
+ p->reason_bytes[p->byte] = *cur;
+ cur++;
+ p->byte++;
+ }
+
+ if (p->byte == 4) {
+ GPR_ASSERT(is_last);
+ state->rst_stream = 1;
+ state->rst_stream_reason =
+ (((gpr_uint32)p->reason_bytes[0]) << 24) |
+ (((gpr_uint32)p->reason_bytes[1]) << 16) |
+ (((gpr_uint32)p->reason_bytes[2]) << 8) |
+ (((gpr_uint32)p->reason_bytes[3]));
+ }
+
+ return GRPC_CHTTP2_PARSE_OK;
+}
diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h
index 2d3ee18637..07a3c98d03 100644
--- a/src/core/transport/chttp2/frame_rst_stream.h
+++ b/src/core/transport/chttp2/frame_rst_stream.h
@@ -35,7 +35,18 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H
#include <grpc/support/slice.h>
+#include "src/core/transport/chttp2/frame.h"
+
+typedef struct {
+ gpr_uint8 byte;
+ gpr_uint8 reason_bytes[4];
+} grpc_chttp2_rst_stream_parser;
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+ grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+ void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index 3fd8f67226..a489543868 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -654,7 +654,7 @@ static int parse_stream_weight(grpc_chttp2_hpack_parser *p,
return 1;
}
- return parse_begin(p, cur + 1, end);
+ return p->after_prioritization(p, cur + 1, end);
}
static int parse_stream_dep3(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur,
@@ -1349,7 +1349,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p,
}
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
- GPR_ASSERT(p->state == parse_begin);
+ p->after_prioritization = p->state;
p->state = parse_stream_dep0;
}
diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h
index bb4c1a1f49..bfc06b3980 100644
--- a/src/core/transport/chttp2/hpack_parser.h
+++ b/src/core/transport/chttp2/hpack_parser.h
@@ -62,6 +62,8 @@ struct grpc_chttp2_hpack_parser {
grpc_chttp2_hpack_parser_state state;
/* future states dependent on the opening op code */
const grpc_chttp2_hpack_parser_state *next_state;
+ /* what to do after skipping prioritization data */
+ grpc_chttp2_hpack_parser_state after_prioritization;
/* the value we're currently parsing */
union {
gpr_uint32 *value;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index a6f9f782a1..9dc5f23389 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -154,7 +154,13 @@ typedef enum {
WRITE_STATE_OPEN,
WRITE_STATE_QUEUED_CLOSE,
WRITE_STATE_SENT_CLOSE
-} WRITE_STATE;
+} write_state;
+
+typedef enum {
+ DONT_SEND_CLOSED = 0,
+ SEND_CLOSED,
+ SEND_CLOSED_WITH_RST_STREAM
+} send_closed;
typedef struct {
stream *head;
@@ -267,6 +273,7 @@ struct transport {
grpc_chttp2_window_update_parser window_update;
grpc_chttp2_settings_parser settings;
grpc_chttp2_ping_parser ping;
+ grpc_chttp2_rst_stream_parser rst_stream;
} simple_parsers;
/* goaway */
@@ -312,8 +319,8 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- WRITE_STATE write_state;
- gpr_uint8 send_closed;
+ write_state write_state;
+ send_closed send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
@@ -937,7 +944,11 @@ static int prepare_write(transport *t) {
if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
- s->send_closed = 1;
+ if (!t->is_client && !s->read_closed) {
+ s->send_closed = SEND_CLOSED_WITH_RST_STREAM;
+ } else {
+ s->send_closed = SEND_CLOSED;
+ }
}
if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
@@ -982,9 +993,12 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
+ s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
- if (s->send_closed) {
+ if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
+ gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR));
+ }
+ if (s->send_closed != DONT_SEND_CLOSED) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@@ -999,9 +1013,10 @@ static void finish_write_common(transport *t, int success) {
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->write_state = WRITE_STATE_SENT_CLOSE;
- if (1||!s->cancelled) {
- maybe_finish_read(t, s);
+ if (!t->is_client) {
+ s->read_closed = 1;
}
+ maybe_finish_read(t, s);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -1127,6 +1142,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->recv_ops) {
GPR_ASSERT(s->incoming_sopb == NULL);
+ GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED);
s->recv_done_closure.cb = op->on_done_recv;
s->recv_done_closure.user_data = op->recv_user_data;
s->incoming_sopb = op->recv_ops;
@@ -1214,12 +1230,14 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
- schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- if (s->outgoing_sopb) {
- schedule_nuke_sopb(t, s->outgoing_sopb);
- s->outgoing_sopb = NULL;
- stream_list_remove(t, s, WRITABLE);
- schedule_cb(t, s->send_done_closure, 0);
+ if (error_code != GRPC_CHTTP2_NO_ERROR) {
+ schedule_nuke_sopb(t, &s->parser.incoming_sopb);
+ if (s->outgoing_sopb) {
+ schedule_nuke_sopb(t, s->outgoing_sopb);
+ s->outgoing_sopb = NULL;
+ stream_list_remove(t, s, WRITABLE);
+ schedule_cb(t, s->send_done_closure, 0);
+ }
}
if (s->cancelled) {
send_rst = 0;
@@ -1228,31 +1246,34 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
- gpr_ltoa(local_status, buffer);
- add_incoming_metadata(
- t, s,
- grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- if (!optional_message) {
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
- }
- } else {
+ if (error_code != GRPC_CHTTP2_NO_ERROR) {
+ /* synthesize a status if we don't believe we'll get one */
+ gpr_ltoa(local_status, buffer);
add_incoming_metadata(
t, s,
- grpc_mdelem_from_metadata_strings(
- t->metadata_context,
- grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
- grpc_mdstr_ref(optional_message)));
+ grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(
+ t, s,
+ grpc_mdelem_from_metadata_strings(
+ t->metadata_context,
+ grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
+ grpc_mdstr_ref(optional_message)));
+ }
+ add_metadata_batch(t, s);
}
- add_metadata_batch(t, s);
- maybe_finish_read(t, s);
}
+ maybe_finish_read(t, s);
}
if (!id) send_rst = 0;
if (send_rst) {
@@ -1527,6 +1548,19 @@ static int init_ping_parser(transport *t) {
return ok;
}
+static int init_rst_stream_parser(transport *t) {
+ int ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream,
+ t->incoming_frame_size,
+ t->incoming_frame_flags);
+ if (!ok) {
+ drop_connection(t);
+ }
+ t->parser = grpc_chttp2_rst_stream_parser_parse;
+ t->parser_data = &t->simple_parsers.rst_stream;
+ return ok;
+}
+
static int init_goaway_parser(transport *t) {
int ok =
GRPC_CHTTP2_PARSE_OK ==
@@ -1581,12 +1615,7 @@ static int init_frame_parser(transport *t) {
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
return 0;
case GRPC_CHTTP2_FRAME_RST_STREAM:
- /* TODO(ctiller): actually parse the reason */
- cancel_stream_id(
- t, t->incoming_stream_id,
- grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
- GRPC_CHTTP2_CANCEL, 0);
- return init_skip_frame(t, 0);
+ return init_rst_stream_parser(t);
case GRPC_CHTTP2_FRAME_SETTINGS:
return init_settings_frame_parser(t);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
@@ -1650,6 +1679,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (st.goaway) {
add_goaway(t, st.goaway_error, st.goaway_text);
}
+ if (st.rst_stream) {
+ cancel_stream_id(
+ t, t->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
+ st.rst_stream_reason, 0);
+ }
if (st.process_ping_reply) {
for (i = 0; i < t->ping_count; i++) {
if (0 ==
diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h
index e7508718f5..76e3f3c1f8 100644
--- a/src/core/transport/metadata.h
+++ b/src/core/transport/metadata.h
@@ -96,6 +96,7 @@ size_t grpc_mdctx_get_mdtab_free_test_only(grpc_mdctx *mdctx);
/* Constructors for grpc_mdstr instances; take a variety of data types that
clients may have handy */
grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str);
+/* Unrefs the slice. */
grpc_mdstr *grpc_mdstr_from_slice(grpc_mdctx *ctx, gpr_slice slice);
grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *str,
size_t length);
@@ -110,6 +111,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, grpc_mdstr *key,
grpc_mdstr *value);
grpc_mdelem *grpc_mdelem_from_strings(grpc_mdctx *ctx, const char *key,
const char *value);
+/* Unrefs the slices. */
grpc_mdelem *grpc_mdelem_from_slices(grpc_mdctx *ctx, gpr_slice key,
gpr_slice value);
grpc_mdelem *grpc_mdelem_from_string_and_buffer(grpc_mdctx *ctx,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 7a389ea393..6f8d39e352 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -38,6 +38,7 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/transport/stream_op.h"
+#include "src/core/channel/context.h"
/* forward declarations */
typedef struct grpc_transport grpc_transport;
@@ -78,7 +79,7 @@ typedef struct grpc_transport_op {
grpc_mdstr *cancel_message;
/* Indexes correspond to grpc_context_index enum values */
- void *const *context;
+ grpc_call_context_element *context;
} grpc_transport_op;
/* Callbacks made from the transport to the upper layers of grpc. */
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 9718a0b048..63b4c42131 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -639,7 +639,7 @@ static tsi_result ssl_protector_protect(tsi_frame_protector* self,
tsi_result result = TSI_OK;
/* First see if we have some pending data in the SSL BIO. */
- size_t pending_in_ssl = BIO_ctrl_pending(impl->from_ssl);
+ size_t pending_in_ssl = BIO_pending(impl->from_ssl);
if (pending_in_ssl > 0) {
*unprotected_bytes_size = 0;
read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames,
@@ -694,7 +694,7 @@ static tsi_result ssl_protector_protect_flush(
impl->buffer_offset = 0;
}
- *still_pending_size = BIO_ctrl_pending(impl->from_ssl);
+ *still_pending_size = BIO_pending(impl->from_ssl);
if (*still_pending_size == 0) return TSI_OK;
read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames,
@@ -704,7 +704,7 @@ static tsi_result ssl_protector_protect_flush(
return TSI_INTERNAL_ERROR;
}
*protected_output_frames_size = read_from_ssl;
- *still_pending_size = BIO_ctrl_pending(impl->from_ssl);
+ *still_pending_size = BIO_pending(impl->from_ssl);
return TSI_OK;
}
@@ -782,7 +782,7 @@ static tsi_result ssl_handshaker_get_bytes_to_send_to_peer(tsi_handshaker* self,
}
}
*bytes_size = (size_t)bytes_read_from_ssl;
- return BIO_ctrl_pending(impl->from_ssl) == 0 ? TSI_OK : TSI_INCOMPLETE_DATA;
+ return BIO_pending(impl->from_ssl) == 0 ? TSI_OK : TSI_INCOMPLETE_DATA;
}
static tsi_result ssl_handshaker_get_result(tsi_handshaker* self) {
@@ -818,7 +818,7 @@ static tsi_result ssl_handshaker_process_bytes_from_peer(
ssl_result = SSL_get_error(impl->ssl, ssl_result);
switch (ssl_result) {
case SSL_ERROR_WANT_READ:
- if (BIO_ctrl_pending(impl->from_ssl) == 0) {
+ if (BIO_pending(impl->from_ssl) == 0) {
/* We need more data. */
return TSI_INCOMPLETE_DATA;
} else {