aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-08 17:04:13 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-08 17:04:13 -0700
commitf662aa4d6fa77aea771c1164c52d94c5266f0731 (patch)
tree691a5c65c7790d8406507b491c1c50d24bdd76fc /src/core
parentc4440d913ea5635d233139d862e1d49e5afbb2f1 (diff)
parentfcdf33b39f2ccaa69a301810b321dc77b6bcb174 (diff)
Merge github.com:grpc/grpc into bye-bye-completion-queue-pie
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/endpoint_pair_windows.c2
-rw-r--r--src/core/iomgr/iocp_windows.c35
-rw-r--r--src/core/iomgr/socket_windows.c7
-rw-r--r--src/core/iomgr/socket_windows.h13
-rw-r--r--src/core/iomgr/tcp_client_windows.c24
-rw-r--r--src/core/iomgr/tcp_server_windows.c3
-rw-r--r--src/core/iomgr/tcp_windows.c92
-rw-r--r--src/core/security/auth.c131
-rw-r--r--src/core/security/security_context.c79
-rw-r--r--src/core/security/security_context.h48
-rw-r--r--src/core/surface/call.c12
-rw-r--r--src/core/surface/call.h8
12 files changed, 277 insertions, 177 deletions
diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c
index 58960b6028..7c945ebad4 100644
--- a/src/core/iomgr/endpoint_pair_windows.c
+++ b/src/core/iomgr/endpoint_pair_windows.c
@@ -68,6 +68,8 @@ static void create_sockets(SOCKET sv[2]) {
GPR_ASSERT(svr_sock != INVALID_SOCKET);
closesocket(lst_sock);
+ grpc_tcp_prepare_socket(cli_sock);
+ grpc_tcp_prepare_socket(svr_sock);
sv[1] = cli_sock;
sv[0] = svr_sock;
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 1cdf3da0d6..8827bb99bc 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -53,6 +53,7 @@ static OVERLAPPED g_iocp_custom_overlap;
static gpr_event g_shutdown_iocp;
static gpr_event g_iocp_done;
static gpr_atm g_orphans = 0;
+static gpr_atm g_custom_events = 0;
static HANDLE g_iocp;
@@ -62,20 +63,19 @@ static void do_iocp_work() {
DWORD flags = 0;
ULONG_PTR completion_key;
LPOVERLAPPED overlapped;
- gpr_timespec wait_time = gpr_inf_future;
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
void(*f)(void *, int) = NULL;
void *opaque = NULL;
success = GetQueuedCompletionStatus(g_iocp, &bytes,
&completion_key, &overlapped,
- gpr_time_to_millis(wait_time));
- if (!success && !overlapped) {
- /* The deadline got attained. */
- return;
- }
+ INFINITE);
+ /* success = 0 and overlapped = NULL means the deadline got attained.
+ Which is impossible. since our wait time is +inf */
+ GPR_ASSERT(success || overlapped);
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &g_iocp_custom_overlap) {
+ gpr_atm_full_fetch_add(&g_custom_events, -1);
if (completion_key == (ULONG_PTR) &g_iocp_kick_token) {
/* We were awoken from a kick. */
return;
@@ -93,13 +93,17 @@ static void do_iocp_work() {
gpr_log(GPR_ERROR, "Unknown IOCP operation");
abort();
}
- success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
- FALSE, &flags);
+ GPR_ASSERT(info->outstanding);
if (socket->orphan) {
- grpc_winsocket_destroy(socket);
- gpr_atm_full_fetch_add(&g_orphans, -1);
+ info->outstanding = 0;
+ if (!socket->read_info.outstanding && !socket->write_info.outstanding) {
+ grpc_winsocket_destroy(socket);
+ gpr_atm_full_fetch_add(&g_orphans, -1);
+ }
return;
}
+ success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+ FALSE, &flags);
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
@@ -117,10 +121,14 @@ static void do_iocp_work() {
}
static void iocp_loop(void *p) {
- while (gpr_atm_acq_load(&g_orphans) || !gpr_event_get(&g_shutdown_iocp)) {
+ void * eventshutdown = NULL;
+ while (gpr_atm_acq_load(&g_orphans) ||
+ gpr_atm_acq_load(&g_custom_events) ||
+ !gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
}
+ gpr_log(GPR_DEBUG, "iocp_loop is done");
gpr_event_set(&g_iocp_done, (void *)1);
}
@@ -128,8 +136,8 @@ static void iocp_loop(void *p) {
void grpc_iocp_init(void) {
gpr_thd_id id;
- g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
- (ULONG_PTR)NULL, 0);
+ g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
+ NULL, (ULONG_PTR)NULL, 0);
GPR_ASSERT(g_iocp);
gpr_event_init(&g_iocp_done);
@@ -140,6 +148,7 @@ void grpc_iocp_init(void) {
void grpc_iocp_kick(void) {
BOOL success;
+ gpr_atm_full_fetch_add(&g_custom_events, 1);
success = PostQueuedCompletionStatus(g_iocp, 0,
(ULONG_PTR) &g_iocp_kick_token,
&g_iocp_custom_overlap);
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 9306310d43..35dbfa1587 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -75,15 +75,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) {
/* Abandons a socket. Either we're going to queue it up for garbage collecting
from the IO Completion Port thread, or destroy it immediately. Note that this
mechanisms assumes that we're either always waiting for an operation, or we
- explicitely know that we don't. If there is a future case where we can have
+ explicitly know that we don't. If there is a future case where we can have
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
- if (!winsocket->closed_early) {
+ if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket);
- }
- if (winsocket->closed_early) {
+ } else {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index 6e778a776a..8898def854 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -65,12 +65,14 @@ typedef struct grpc_winsocket_callback_info {
/* The results of the overlapped operation. */
DWORD bytes_transfered;
int wsa_error;
+ /* A boolean indicating that we started an operation. */
+ int outstanding;
} grpc_winsocket_callback_info;
/* This is a wrapper to a Windows socket. A socket can have one outstanding
read, and one outstanding write. Doing an asynchronous accept means waiting
for a read operation. Doing an asynchronous connect means waiting for a
- write operation. These are completely abitrary ties between the operation
+ write operation. These are completely arbitrary ties between the operation
and the kind of event, because we can have one overlapped per pending
operation, whichever its nature is. So we could have more dedicated pending
operation callbacks for connect and listen. But given the scope of listen
@@ -87,17 +89,10 @@ typedef struct grpc_winsocket {
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
int added_to_iocp;
- /* A boolean to indicate that the caller has abandonned that socket, but
+ /* A boolean to indicate that the caller has abandoned that socket, but
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan;
- /* A boolean to indicate that the socket was already closed somehow, and
- that no operation is going to be pending. Trying to abandon a socket in
- that state won't result in an orphan, but will instead be destroyed
- without further delay. We could avoid that boolean by adding one into
- grpc_winsocket_callback_info describing that the operation is pending,
- but that 1) waste memory more and 2) obfuscate the intent a bit more. */
- int closed_early;
} grpc_winsocket;
/* Create a wrapped windows handle. This takes ownership of it, meaning that
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 653c0c65c5..d95346f87a 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -74,7 +74,7 @@ static void async_connect_cleanup(async_connect *ac) {
static void on_alarm(void *acp, int occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
- /* If the alarm didn't occor, it got cancelled. */
+ /* If the alarm didn't occur, it got cancelled. */
if (ac->socket != NULL && occured) {
grpc_winsocket_shutdown(ac->socket);
}
@@ -98,6 +98,7 @@ static void on_connect(void *acp, int from_iocp) {
if (from_iocp) {
DWORD transfered_bytes = 0;
DWORD flags;
+ info->outstanding = 0;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE,
&flags);
@@ -106,10 +107,8 @@ static void on_connect(void *acp, int from_iocp) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message);
- goto finish;
- } else {
+ } else if (!aborted) {
ep = grpc_tcp_create(ac->socket);
- goto finish;
}
} else {
gpr_log(GPR_ERROR, "on_connect is shutting down");
@@ -125,20 +124,12 @@ static void on_connect(void *acp, int from_iocp) {
return;
}
- abort();
+ ac->socket->write_info.outstanding = 0;
-finish:
/* If we don't have an endpoint, it means the connection failed,
so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
- if (!ep || aborted) {
- /* If the connection failed, it means we won't get an IOCP notification,
- so let's flag it as already closed. But if the connection was aborted,
- while we still got an endpoint, we have to wait for the IOCP to collect
- that socket. So let's properly flag that. */
- ac->socket->closed_early = !ep;
- grpc_winsocket_orphan(ac->socket);
- }
+ if (!ep || aborted) grpc_winsocket_orphan(ac->socket);
async_connect_cleanup(ac);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
@@ -189,7 +180,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
&ioctl_num_bytes, NULL, NULL);
if (status != 0) {
- message = "Unable to retreive ConnectEx pointer: %s";
+ message = "Unable to retrieve ConnectEx pointer: %s";
goto failure;
}
@@ -204,6 +195,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
socket = grpc_winsocket_create(sock);
info = &socket->write_info;
+ info->outstanding = 1;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
@@ -225,6 +217,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;
@@ -233,7 +226,6 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message);
if (socket) {
- socket->closed_early = 1;
grpc_winsocket_orphan(socket);
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index c6137e1e1d..d22acc7453 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -123,7 +123,6 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
- sp->socket->closed_early = 1;
grpc_winsocket_orphan(sp->socket);
}
gpr_free(s->ports);
@@ -249,6 +248,7 @@ static void on_accept(void *arg, int from_iocp) {
if (sp->shutting_down) {
GPR_ASSERT(from_iocp);
sp->shutting_down = 0;
+ sp->socket->read_info.outstanding = 0;
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv);
@@ -420,6 +420,7 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
+ s->ports[i].socket->read_info.outstanding = 1;
start_accept(s->ports + i);
s->active_ports++;
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index c8483bd891..f16b4c1268 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -86,12 +86,10 @@ typedef struct grpc_tcp {
grpc_endpoint_read_cb read_cb;
void *read_user_data;
gpr_slice read_slice;
- int outstanding_read;
grpc_endpoint_write_cb write_cb;
void *write_user_data;
gpr_slice_buffer write_slices;
- int outstanding_write;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
@@ -141,14 +139,13 @@ static void on_read(void *tcpp, int from_iocp) {
return;
}
- GPR_ASSERT(tcp->outstanding_read);
+ GPR_ASSERT(tcp->socket->read_info.outstanding);
if (socket->read_info.wsa_error != 0) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR;
- socket->closed_early = 1;
} else {
if (info->bytes_transfered != 0) {
sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
@@ -161,7 +158,7 @@ static void on_read(void *tcpp, int from_iocp) {
}
}
- tcp->outstanding_read = 0;
+ tcp->socket->read_info.outstanding = 0;
tcp_unref(tcp);
cb(opaque, slice, nslices, status);
@@ -175,13 +172,15 @@ static void win_notify_on_read(grpc_endpoint *ep,
int status;
DWORD bytes_read = 0;
DWORD flags = 0;
- int error;
WSABUF buffer;
- GPR_ASSERT(!tcp->outstanding_read);
- GPR_ASSERT(!tcp->shutting_down);
+ GPR_ASSERT(!tcp->socket->read_info.outstanding);
+ if (tcp->shutting_down) {
+ cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+ return;
+ }
tcp_ref(tcp);
- tcp->outstanding_read = 1;
+ tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb;
tcp->read_user_data = arg;
@@ -208,34 +207,13 @@ static void win_notify_on_read(grpc_endpoint *ep,
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
&info->overlapped, NULL);
- if (status == 0) {
- grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
- return;
- }
-
- error = WSAGetLastError();
-
- if (error != WSA_IO_PENDING) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "WSARecv error: %s - this means we're going to leak.",
- utf8_message);
- gpr_free(utf8_message);
- /* I'm pretty sure this is a very bad situation there. Hence the log.
- What will happen now is that the socket will neither wait for read
- or write, unless the caller retry, which is unlikely, but I am not
- sure if that's guaranteed. And there might also be a write pending.
- This means that the future orphanage of that socket will be in limbo,
- and we're going to leak it. I have no idea what could cause this
- specific case however, aside from a parameter error from our call.
- Normal read errors would actually happen during the overlapped
- operation, which is the supported way to go for that. */
- tcp->outstanding_read = 0;
- tcp_unref(tcp);
- cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
- /* Per the comment above, I'm going to treat that case as a hard failure
- for now, and leave the option to catch that and debug. */
- __debugbreak();
- return;
+ if (status != 0) {
+ int wsa_error = WSAGetLastError();
+ if (wsa_error != WSA_IO_PENDING) {
+ info->wsa_error = wsa_error;
+ on_read(tcp, 1);
+ return;
+ }
}
grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
@@ -260,7 +238,7 @@ static void on_write(void *tcpp, int from_iocp) {
}
gpr_mu_unlock(&tcp->mu);
- GPR_ASSERT(tcp->outstanding_write);
+ GPR_ASSERT(tcp->socket->write_info.outstanding);
if (do_abort) {
if (from_iocp) gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
@@ -274,13 +252,12 @@ static void on_write(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR;
- tcp->socket->closed_early = 1;
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
}
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
- tcp->outstanding_write = 0;
+ tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp);
cb(opaque, status);
@@ -301,11 +278,13 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
WSABUF *allocated = NULL;
WSABUF *buffers = local_buffers;
- GPR_ASSERT(!tcp->outstanding_write);
- GPR_ASSERT(!tcp->shutting_down);
+ GPR_ASSERT(!tcp->socket->write_info.outstanding);
+ if (tcp->shutting_down) {
+ return GRPC_ENDPOINT_WRITE_ERROR;
+ }
tcp_ref(tcp);
- tcp->outstanding_write = 1;
+ tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb;
tcp->write_user_data = arg;
@@ -341,7 +320,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
}
if (allocated) gpr_free(allocated);
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
- tcp->outstanding_write = 0;
+ tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp);
return ret;
}
@@ -353,29 +332,12 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
&bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated);
- /* It is possible the operation completed then. But we'd still get an IOCP
- notification. So let's ignore it and wait for the IOCP. */
if (status != 0) {
- int error = WSAGetLastError();
- if (error != WSA_IO_PENDING) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "WSASend error: %s - this means we're going to leak.",
- utf8_message);
- gpr_free(utf8_message);
- /* I'm pretty sure this is a very bad situation there. Hence the log.
- What will happen now is that the socket will neither wait for read
- or write, unless the caller retry, which is unlikely, but I am not
- sure if that's guaranteed. And there might also be a read pending.
- This means that the future orphanage of that socket will be in limbo,
- and we're going to leak it. I have no idea what could cause this
- specific case however, aside from a parameter error from our call.
- Normal read errors would actually happen during the overlapped
- operation, which is the supported way to go for that. */
- tcp->outstanding_write = 0;
+ int wsa_error = WSAGetLastError();
+ if (wsa_error != WSA_IO_PENDING) {
+ gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
+ tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp);
- /* Per the comment above, I'm going to treat that case as a hard failure
- for now, and leave the option to catch that and debug. */
- __debugbreak();
return GRPC_ENDPOINT_WRITE_ERROR;
}
}
diff --git a/src/core/security/auth.c b/src/core/security/auth.c
index 2322c12aa5..faf12d8f14 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/auth.c
@@ -40,6 +40,7 @@
#include "src/core/support/string.h"
#include "src/core/channel/channel_stack.h"
+#include "src/core/security/security_context.h"
#include "src/core/security/security_connector.h"
#include "src/core/security/credentials.h"
#include "src/core/surface/call.h"
@@ -67,6 +68,15 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
+static void bubble_up_error(grpc_call_element *elem, const char *error_msg) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_transport_op_add_cancellation(
+ &calld->op, GRPC_STATUS_UNAUTHENTICATED,
+ grpc_mdstr_from_string(chand->md_ctx, error_msg));
+ grpc_call_next_op(elem, &calld->op);
+}
+
static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
size_t num_md,
grpc_credentials_status status) {
@@ -75,6 +85,10 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
grpc_transport_op *op = &calld->op;
grpc_metadata_batch *mdb;
size_t i;
+ if (status != GRPC_CREDENTIALS_OK) {
+ bubble_up_error(elem, "Credentials failed to get metadata.");
+ return;
+ }
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx &&
op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA);
@@ -108,37 +122,48 @@ static char *build_service_url(const char *url_scheme, call_data *calld) {
static void send_security_metadata(grpc_call_element *elem,
grpc_transport_op *op) {
- /* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
+ channel_data *chand = elem->channel_data;
+ grpc_client_security_context *ctx =
+ (grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY];
+ char *service_url = NULL;
grpc_credentials *channel_creds =
- channeld->security_connector->request_metadata_creds;
- /* TODO(jboeuf):
- Decide on the policy in this case:
- - populate both channel and call?
- - the call takes precedence over the channel?
- - leave this decision up to the channel credentials? */
- if (calld->creds != NULL) {
- gpr_log(GPR_ERROR, "Ignoring per call credentials for now.");
+ chand->security_connector->request_metadata_creds;
+ int channel_creds_has_md =
+ (channel_creds != NULL) &&
+ grpc_credentials_has_request_metadata(channel_creds);
+ int call_creds_has_md = (ctx != NULL) && (ctx->creds != NULL) &&
+ grpc_credentials_has_request_metadata(ctx->creds);
+
+ if (!channel_creds_has_md && !call_creds_has_md) {
+ /* Skip sending metadata altogether. */
+ grpc_call_next_op(elem, op);
+ return;
}
- if (channel_creds != NULL &&
- grpc_credentials_has_request_metadata(channel_creds)) {
- char *service_url =
- build_service_url(channeld->security_connector->base.url_scheme, calld);
- calld->op = *op; /* Copy op (originates from the caller's stack). */
- grpc_credentials_get_request_metadata(channel_creds, service_url,
- on_credentials_metadata, elem);
- gpr_free(service_url);
+
+ if (channel_creds_has_md && call_creds_has_md) {
+ calld->creds = grpc_composite_credentials_create(channel_creds, ctx->creds);
+ if (calld->creds == NULL) {
+ bubble_up_error(elem,
+ "Incompatible credentials set on channel and call.");
+ return;
+ }
} else {
- grpc_call_next_op(elem, op);
+ calld->creds =
+ grpc_credentials_ref(call_creds_has_md ? ctx->creds : channel_creds);
}
+
+ service_url =
+ build_service_url(chand->security_connector->base.url_scheme, calld);
+ calld->op = *op; /* Copy op (originates from the caller's stack). */
+ grpc_credentials_get_request_metadata(calld->creds, service_url,
+ on_credentials_metadata, elem);
+ gpr_free(service_url);
}
static void on_host_checked(void *user_data, grpc_security_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
if (status == GRPC_SECURITY_OK) {
send_security_metadata(elem, &calld->op);
@@ -146,11 +171,8 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
- grpc_transport_op_add_cancellation(
- &calld->op, GRPC_STATUS_UNAUTHENTICATED,
- grpc_mdstr_from_string(chand->md_ctx, error_msg));
+ bubble_up_error(elem, error_msg);
gpr_free(error_msg);
- grpc_call_next_op(elem, &calld->op);
}
}
@@ -163,7 +185,7 @@ static void auth_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
+ channel_data *chand = elem->channel_data;
grpc_linked_mdelem *l;
size_t i;
@@ -179,10 +201,10 @@ static void auth_start_transport_op(grpc_call_element *elem,
grpc_mdelem *md = l->md;
/* Pointer comparison is OK for md_elems created from the same context.
*/
- if (md->key == channeld->authority_string) {
+ if (md->key == chand->authority_string) {
if (calld->host != NULL) grpc_mdstr_unref(calld->host);
calld->host = grpc_mdstr_ref(md->value);
- } else if (md->key == channeld->path_string) {
+ } else if (md->key == chand->path_string) {
if (calld->method != NULL) grpc_mdstr_unref(calld->method);
calld->method = grpc_mdstr_ref(md->value);
}
@@ -192,18 +214,15 @@ static void auth_start_transport_op(grpc_call_element *elem,
const char *call_host = grpc_mdstr_as_c_string(calld->host);
calld->op = *op; /* Copy op (originates from the caller's stack). */
status = grpc_channel_security_connector_check_call_host(
- channeld->security_connector, call_host, on_host_checked, elem);
+ chand->security_connector, call_host, on_host_checked, elem);
if (status != GRPC_SECURITY_OK) {
if (status == GRPC_SECURITY_ERROR) {
char *error_msg;
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
- grpc_transport_op_add_cancellation(
- &calld->op, GRPC_STATUS_UNAUTHENTICATED,
- grpc_mdstr_from_string(channeld->md_ctx, error_msg));
+ bubble_up_error(elem, error_msg);
gpr_free(error_msg);
- grpc_call_next_op(elem, &calld->op);
}
return; /* early exit */
}
@@ -228,8 +247,6 @@ static void channel_op(grpc_channel_element *elem,
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
- /* TODO(jboeuf):
- Find a way to pass-in the credentials from the caller here. */
call_data *calld = elem->call_data;
calld->creds = NULL;
calld->host = NULL;
@@ -242,9 +259,7 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- if (calld->creds != NULL) {
- grpc_credentials_unref(calld->creds);
- }
+ grpc_credentials_unref(calld->creds);
if (calld->host != NULL) {
grpc_mdstr_unref(calld->host);
}
@@ -260,7 +275,7 @@ static void init_channel_elem(grpc_channel_element *elem,
int is_last) {
grpc_security_connector *ctx = grpc_find_security_connector_in_args(args);
/* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
+ channel_data *chand = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
@@ -271,35 +286,35 @@ static void init_channel_elem(grpc_channel_element *elem,
/* initialize members */
GPR_ASSERT(ctx->is_client_side);
- channeld->security_connector =
+ chand->security_connector =
(grpc_channel_security_connector *)grpc_security_connector_ref(ctx);
- channeld->md_ctx = metadata_context;
- channeld->authority_string =
- grpc_mdstr_from_string(channeld->md_ctx, ":authority");
- channeld->path_string = grpc_mdstr_from_string(channeld->md_ctx, ":path");
- channeld->error_msg_key =
- grpc_mdstr_from_string(channeld->md_ctx, "grpc-message");
- channeld->status_key =
- grpc_mdstr_from_string(channeld->md_ctx, "grpc-status");
+ chand->md_ctx = metadata_context;
+ chand->authority_string =
+ grpc_mdstr_from_string(chand->md_ctx, ":authority");
+ chand->path_string = grpc_mdstr_from_string(chand->md_ctx, ":path");
+ chand->error_msg_key =
+ grpc_mdstr_from_string(chand->md_ctx, "grpc-message");
+ chand->status_key =
+ grpc_mdstr_from_string(chand->md_ctx, "grpc-status");
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
- grpc_channel_security_connector *ctx = channeld->security_connector;
+ channel_data *chand = elem->channel_data;
+ grpc_channel_security_connector *ctx = chand->security_connector;
if (ctx != NULL) grpc_security_connector_unref(&ctx->base);
- if (channeld->authority_string != NULL) {
- grpc_mdstr_unref(channeld->authority_string);
+ if (chand->authority_string != NULL) {
+ grpc_mdstr_unref(chand->authority_string);
}
- if (channeld->error_msg_key != NULL) {
- grpc_mdstr_unref(channeld->error_msg_key);
+ if (chand->error_msg_key != NULL) {
+ grpc_mdstr_unref(chand->error_msg_key);
}
- if (channeld->status_key != NULL) {
- grpc_mdstr_unref(channeld->status_key);
+ if (chand->status_key != NULL) {
+ grpc_mdstr_unref(chand->status_key);
}
- if (channeld->path_string != NULL) {
- grpc_mdstr_unref(channeld->path_string);
+ if (chand->path_string != NULL) {
+ grpc_mdstr_unref(chand->path_string);
}
}
diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c
new file mode 100644
index 0000000000..b90dc5097a
--- /dev/null
+++ b/src/core/security/security_context.c
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include "src/core/security/security_context.h"
+#include "src/core/surface/call.h"
+
+#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+grpc_call_error grpc_call_set_credentials(grpc_call *call,
+ grpc_credentials *creds) {
+ grpc_client_security_context *ctx = NULL;
+ if (!grpc_call_is_client(call)) {
+ gpr_log(GPR_ERROR, "Method is client-side only.");
+ return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ }
+ if (creds != NULL && !grpc_credentials_has_request_metadata_only(creds)) {
+ gpr_log(GPR_ERROR, "Incompatible credentials to set on a call.");
+ return GRPC_CALL_ERROR;
+ }
+ ctx = (grpc_client_security_context *)grpc_call_context_get(
+ call, GRPC_CONTEXT_SECURITY);
+ if (ctx == NULL) {
+ ctx = grpc_client_security_context_create();
+ ctx->creds = grpc_credentials_ref(creds);
+ grpc_call_context_set(call, GRPC_CONTEXT_SECURITY, ctx,
+ grpc_client_security_context_destroy);
+ } else {
+ grpc_credentials_unref(ctx->creds);
+ ctx->creds = grpc_credentials_ref(creds);
+ }
+ return GRPC_CALL_OK;
+}
+
+grpc_client_security_context *grpc_client_security_context_create(void) {
+ grpc_client_security_context *ctx =
+ gpr_malloc(sizeof(grpc_client_security_context));
+ memset(ctx, 0, sizeof(grpc_client_security_context));
+ return ctx;
+}
+
+void grpc_client_security_context_destroy(void *ctx) {
+ grpc_client_security_context *c = (grpc_client_security_context *)ctx;
+ grpc_credentials_unref(c->creds);
+ gpr_free(ctx);
+}
diff --git a/src/core/security/security_context.h b/src/core/security/security_context.h
new file mode 100644
index 0000000000..561633b452
--- /dev/null
+++ b/src/core/security/security_context.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H
+#define GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H
+
+#include "src/core/security/credentials.h"
+
+/* Security context attached to a client-side call. */
+typedef struct {
+ grpc_credentials *creds;
+} grpc_client_security_context;
+
+grpc_client_security_context *grpc_client_security_context_create(void);
+void grpc_client_security_context_destroy(void *ctx);
+
+#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */
+
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 647c0ab63c..50df36cae9 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -374,18 +374,10 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
- int flush;
-
call->status[source].is_set = 1;
call->status[source].code = status;
- if (call->is_client) {
- flush = status == GRPC_STATUS_CANCELLED;
- } else {
- flush = status != GRPC_STATUS_OK;
- }
-
- if (flush && !grpc_bbq_empty(&call->incoming_queue)) {
+ if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
grpc_bbq_flush(&call->incoming_queue);
}
}
@@ -1307,3 +1299,5 @@ void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value
void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
return call->context[elem];
}
+
+gpr_uint8 grpc_call_is_client(grpc_call *call) { return call->is_client; }
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 7d3e661bf3..60222bf389 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -97,12 +97,14 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
void grpc_call_internal_ref(grpc_call *call, const char *reason);
void grpc_call_internal_unref(grpc_call *call, const char *reason, int allow_immediate_deletion);
#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call, reason)
-#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, reason, allow_immediate_deletion)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) \
+ grpc_call_internal_unref(call, reason, allow_immediate_deletion)
#else
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call)
-#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, allow_immediate_deletion)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) \
+ grpc_call_internal_unref(call, allow_immediate_deletion)
#endif
grpc_call_error grpc_call_start_ioreq_and_call_back(
@@ -130,4 +132,6 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \
if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag)
+gpr_uint8 grpc_call_is_client(grpc_call *call);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */