diff options
Diffstat (limited to 'src')
37 files changed, 727 insertions, 173 deletions
diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c new file mode 100644 index 0000000000..d78b6ea957 --- /dev/null +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -0,0 +1,85 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WINSOCK_SOCKET +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/endpoint_pair.h" + +#include <errno.h> +#include <fcntl.h> +#include <string.h> + +#include "src/core/iomgr/tcp_windows.h" +#include "src/core/iomgr/socket_windows.h" +#include <grpc/support/log.h> + +static void create_sockets(SOCKET sv[2]) { + SOCKET svr_sock = INVALID_SOCKET; + SOCKET lst_sock = INVALID_SOCKET; + SOCKET cli_sock = INVALID_SOCKET; + SOCKADDR_IN addr; + int addr_len; + + lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + GPR_ASSERT(lst_sock != INVALID_SOCKET); + + memset(&addr, 0, sizeof(addr)); + GPR_ASSERT(bind(lst_sock, (struct sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR); + GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR); + GPR_ASSERT(getsockname(lst_sock, (struct sockaddr*)&addr, &addr_len) != SOCKET_ERROR); + + cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + GPR_ASSERT(cli_sock != INVALID_SOCKET); + + GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr*)&addr, addr_len, NULL, NULL, NULL, NULL) == 0); + svr_sock = accept(lst_sock, (struct sockaddr*)&addr, &addr_len); + GPR_ASSERT(svr_sock != INVALID_SOCKET); + + closesocket(lst_sock); + + sv[1] = cli_sock; + sv[0] = svr_sock; +} + +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) { + SOCKET sv[2]; + grpc_endpoint_pair p; + create_sockets(sv); + p.client = grpc_tcp_create(grpc_winsocket_create(sv[1])); + p.server = grpc_tcp_create(grpc_winsocket_create(sv[0])); + return p; +} + +#endif diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 895f85fc68..7e31f2d7a5 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -174,7 +174,6 @@ void grpc_tcp_server_destroy( while (s->active_ports) { gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); } - gpr_mu_unlock(&s->mu); /* delete ALL the things */ if (s->nports) { @@ -185,7 +184,9 @@ void grpc_tcp_server_destroy( } grpc_fd_orphan(sp->emfd, destroyed_port, s); } + gpr_mu_unlock(&s->mu); } else { + gpr_mu_unlock(&s->mu); finish_shutdown(s); } } diff --git a/src/core/profiling/timers.c b/src/core/profiling/timers.c new file mode 100644 index 0000000000..478397d1bf --- /dev/null +++ b/src/core/profiling/timers.c @@ -0,0 +1,138 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifdef GRPC_LATENCY_PROFILER + +#include "src/core/profiling/timers.h" +#include "src/core/profiling/timers_preciseclock.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/sync.h> +#include <stdio.h> + +typedef struct grpc_timer_entry { + grpc_precise_clock tm; + const char* tag; + int seq; + const char* file; + int line; +} grpc_timer_entry; + +struct grpc_timers_log { + gpr_mu mu; + grpc_timer_entry* log; + int num_entries; + int capacity; + int capacity_limit; + FILE* fp; +}; + +grpc_timers_log* grpc_timers_log_global = NULL; + +grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump) { + grpc_timers_log* log = gpr_malloc(sizeof(*log)); + + /* TODO (vpai): Allow allocation below limit */ + log->log = gpr_malloc(capacity_limit * sizeof(*log->log)); + + /* TODO (vpai): Improve concurrency, do per-thread logging? */ + gpr_mu_init(&log->mu); + + log->num_entries = 0; + log->capacity = log->capacity_limit = capacity_limit; + + log->fp = dump; + + return log; +} + +static void log_report_locked(grpc_timers_log* log) { + FILE* fp = log->fp; + int i; + for (i = 0; i < log->num_entries; i++) { + grpc_timer_entry* entry = &(log->log[i]); + fprintf(fp, "GRPC_LAT_PROF "); + grpc_precise_clock_print(&entry->tm, fp); + fprintf(fp, " %s#%d,%s:%d\n", entry->tag, entry->seq, entry->file, + entry->line); + } + + /* Now clear out the log */ + log->num_entries = 0; +} + +void grpc_timers_log_destroy(grpc_timers_log* log) { + gpr_mu_lock(&log->mu); + log_report_locked(log); + gpr_mu_unlock(&log->mu); + + gpr_free(log->log); + gpr_mu_destroy(&log->mu); + + gpr_free(log); +} + +void grpc_timers_log_add(grpc_timers_log* log, const char* tag, int seq, + const char* file, int line) { + grpc_timer_entry* entry; + + /* TODO (vpai) : Improve concurrency */ + gpr_mu_lock(&log->mu); + if (log->num_entries == log->capacity_limit) { + log_report_locked(log); + } + + entry = &log->log[log->num_entries++]; + + grpc_precise_clock_now(&entry->tm); + entry->tag = tag; + entry->seq = seq; + entry->file = file; + entry->line = line; + + gpr_mu_unlock(&log->mu); +} + +void grpc_timers_log_global_init(void) { + grpc_timers_log_global = grpc_timers_log_create(100000, stdout); +} + +void grpc_timers_log_global_destroy(void) { + grpc_timers_log_destroy(grpc_timers_log_global); +} +#else /* !GRPC_LATENCY_PROFILER */ +void grpc_timers_log_global_init(void) {} +void grpc_timers_log_global_destroy(void) {} +#endif /* GRPC_LATENCY_PROFILER */ diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h new file mode 100644 index 0000000000..ef4cad112a --- /dev/null +++ b/src/core/profiling/timers.h @@ -0,0 +1,70 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_PROFILING_TIMERS_H +#define GRPC_CORE_PROFILING_TIMERS_H + +#include <stdio.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef GRPC_LATENCY_PROFILER + +typedef struct grpc_timers_log grpc_timers_log; + +grpc_timers_log *grpc_timers_log_create(int capacity_limit, FILE *dump); +void grpc_timers_log_add(grpc_timers_log *, const char *tag, int seq, + const char *file, int line); +void grpc_timers_log_destroy(grpc_timers_log *); + +extern grpc_timers_log *grpc_timers_log_global; + +#define GRPC_TIMER_MARK(x, s) \ + grpc_timers_log_add(grpc_timers_log_global, #x, s, __FILE__, __LINE__) + +#else /* !GRPC_LATENCY_PROFILER */ +#define GRPC_TIMER_MARK(x, s) \ + do { \ + } while (0) +#endif /* GRPC_LATENCY_PROFILER */ + +void grpc_timers_log_global_init(void); +void grpc_timers_log_global_destroy(void); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_PROFILING_TIMERS_H */ diff --git a/src/core/profiling/timers_preciseclock.h b/src/core/profiling/timers_preciseclock.h new file mode 100644 index 0000000000..bf4a0eab8a --- /dev/null +++ b/src/core/profiling/timers_preciseclock.h @@ -0,0 +1,56 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H +#define GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H + +#include <grpc/support/time.h> +#include <stdio.h> + +typedef struct grpc_precise_clock grpc_precise_clock; + +#ifdef GRPC_TIMERS_RDTSC +#error RDTSC timers not currently supported +#else +struct grpc_precise_clock { + gpr_timespec clock; +}; +static void grpc_precise_clock_now(grpc_precise_clock* clk) { + clk->clock = gpr_now(); +} +static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) { + fprintf(fp, "%ld.%09d", clk->clock.tv_sec, clk->clock.tv_nsec); +} +#endif /* GRPC_TIMERS_RDTSC */ + +#endif /* GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H */ diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 698e099134..e6d2e9e332 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -111,6 +111,11 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds, creds->vtable->get_request_metadata(creds, service_url, cb, user_data); } +grpc_mdctx *grpc_credentials_get_metadata_context(grpc_credentials *creds) { + if (creds == NULL) return NULL; + return creds->vtable->get_metadata_context(creds); +} + void grpc_server_credentials_release(grpc_server_credentials *creds) { if (creds == NULL) return; creds->vtable->destroy(creds); @@ -167,8 +172,13 @@ static int ssl_has_request_metadata_only(const grpc_credentials *creds) { return 0; } +static grpc_mdctx *ssl_get_metadata_context(grpc_credentials *creds) { + return NULL; +} + static grpc_credentials_vtable ssl_vtable = { - ssl_destroy, ssl_has_request_metadata, ssl_has_request_metadata_only, NULL}; + ssl_destroy, ssl_has_request_metadata, ssl_has_request_metadata_only, + ssl_get_metadata_context, NULL}; static grpc_server_credentials_vtable ssl_server_vtable = {ssl_server_destroy}; @@ -371,9 +381,14 @@ static void jwt_get_request_metadata(grpc_credentials *creds, } } +static grpc_mdctx *jwt_get_metadata_context(grpc_credentials *creds) { + grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds; + return c->md_ctx; +} + static grpc_credentials_vtable jwt_vtable = { jwt_destroy, jwt_has_request_metadata, jwt_has_request_metadata_only, - jwt_get_request_metadata}; + jwt_get_metadata_context, jwt_get_request_metadata}; grpc_credentials *grpc_jwt_credentials_create(const char *json_key, gpr_timespec token_lifetime) { @@ -585,11 +600,19 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, c->fetch_func = fetch_func; } +static grpc_mdctx *oauth2_token_fetcher_get_metadata_context( + grpc_credentials *creds) { + grpc_oauth2_token_fetcher_credentials *c = + (grpc_oauth2_token_fetcher_credentials *)creds; + return c->md_ctx; +} + /* -- ComputeEngine credentials. -- */ static grpc_credentials_vtable compute_engine_vtable = { oauth2_token_fetcher_destroy, oauth2_token_fetcher_has_request_metadata, oauth2_token_fetcher_has_request_metadata_only, + oauth2_token_fetcher_get_metadata_context, oauth2_token_fetcher_get_request_metadata}; static void compute_engine_fetch_oauth2( @@ -633,6 +656,7 @@ static void service_account_destroy(grpc_credentials *creds) { static grpc_credentials_vtable service_account_vtable = { service_account_destroy, oauth2_token_fetcher_has_request_metadata, oauth2_token_fetcher_has_request_metadata_only, + oauth2_token_fetcher_get_metadata_context, oauth2_token_fetcher_get_request_metadata}; static void service_account_fetch_oauth2( @@ -706,6 +730,7 @@ static void refresh_token_destroy(grpc_credentials *creds) { static grpc_credentials_vtable refresh_token_vtable = { refresh_token_destroy, oauth2_token_fetcher_has_request_metadata, oauth2_token_fetcher_has_request_metadata_only, + oauth2_token_fetcher_get_metadata_context, oauth2_token_fetcher_get_request_metadata}; static void refresh_token_fetch_oauth2( @@ -801,9 +826,15 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds, } } +static grpc_mdctx *fake_oauth2_get_metadata_context(grpc_credentials *creds) { + grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds; + return c->md_ctx; +} + static grpc_credentials_vtable fake_oauth2_vtable = { fake_oauth2_destroy, fake_oauth2_has_request_metadata, - fake_oauth2_has_request_metadata_only, fake_oauth2_get_request_metadata}; + fake_oauth2_has_request_metadata_only, fake_oauth2_get_metadata_context, + fake_oauth2_get_request_metadata}; grpc_credentials *grpc_fake_oauth2_credentials_create( const char *token_md_value, int is_async) { @@ -842,10 +873,16 @@ static int fake_transport_security_has_request_metadata_only( return 0; } +static grpc_mdctx *fake_transport_security_get_metadata_context( + grpc_credentials *c) { + return NULL; +} + static grpc_credentials_vtable fake_transport_security_credentials_vtable = { fake_transport_security_credentials_destroy, fake_transport_security_has_request_metadata, - fake_transport_security_has_request_metadata_only, NULL}; + fake_transport_security_has_request_metadata_only, + fake_transport_security_get_metadata_context, NULL}; static grpc_server_credentials_vtable fake_transport_security_server_credentials_vtable = { @@ -995,9 +1032,26 @@ static void composite_get_request_metadata(grpc_credentials *creds, GPR_ASSERT(0); /* Should have exited before. */ } +static grpc_mdctx *composite_get_metadata_context(grpc_credentials *creds) { + grpc_composite_credentials *c = (grpc_composite_credentials *)creds; + grpc_mdctx *ctx = NULL; + size_t i; + for (i = 0; i < c->inner.num_creds; i++) { + grpc_credentials *inner_creds = c->inner.creds_array[i]; + grpc_mdctx *inner_ctx = grpc_credentials_get_metadata_context(inner_creds); + if (inner_ctx) { + GPR_ASSERT(ctx == NULL && + "can only have one metadata context per composite credential"); + ctx = inner_ctx; + } + } + return ctx; +} + static grpc_credentials_vtable composite_credentials_vtable = { composite_destroy, composite_has_request_metadata, - composite_has_request_metadata_only, composite_get_request_metadata}; + composite_has_request_metadata_only, composite_get_metadata_context, + composite_get_request_metadata}; static grpc_credentials_array get_creds_array(grpc_credentials **creds_addr) { grpc_credentials_array result; @@ -1102,9 +1156,14 @@ static void iam_get_request_metadata(grpc_credentials *creds, cb(user_data, md_array, 2, GRPC_CREDENTIALS_OK); } +static grpc_mdctx *iam_get_metadata_context(grpc_credentials *creds) { + grpc_iam_credentials *c = (grpc_iam_credentials *)creds; + return c->md_ctx; +} + static grpc_credentials_vtable iam_vtable = { iam_destroy, iam_has_request_metadata, iam_has_request_metadata_only, - iam_get_request_metadata}; + iam_get_metadata_context, iam_get_request_metadata}; grpc_credentials *grpc_iam_credentials_create(const char *token, const char *authority_selector) { diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index 0f70670ced..562b3faa33 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -94,6 +94,7 @@ typedef struct { void (*destroy)(grpc_credentials *c); int (*has_request_metadata)(const grpc_credentials *c); int (*has_request_metadata_only)(const grpc_credentials *c); + grpc_mdctx *(*get_metadata_context)(grpc_credentials *c); void (*get_request_metadata)(grpc_credentials *c, const char *service_url, grpc_credentials_metadata_cb cb, @@ -114,6 +115,8 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds, const char *service_url, grpc_credentials_metadata_cb cb, void *user_data); +grpc_mdctx *grpc_credentials_get_metadata_context(grpc_credentials *creds); + typedef struct { unsigned char *pem_private_key; size_t pem_private_key_size; diff --git a/src/core/security/factories.c b/src/core/security/factories.c index 02267d5545..3d9216aac4 100644 --- a/src/core/security/factories.c +++ b/src/core/security/factories.c @@ -50,3 +50,19 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, return grpc_secure_channel_create_with_factories( factories, GPR_ARRAY_SIZE(factories), creds, target, args); } + +grpc_security_status grpc_server_security_context_create( + grpc_server_credentials *creds, grpc_security_context **ctx) { + grpc_security_status status = GRPC_SECURITY_ERROR; + + *ctx = NULL; + if (strcmp(creds->type, GRPC_CREDENTIALS_TYPE_SSL) == 0) { + status = grpc_ssl_server_security_context_create( + grpc_ssl_server_credentials_get_config(creds), ctx); + } else if (strcmp(creds->type, + GRPC_CREDENTIALS_TYPE_FAKE_TRANSPORT_SECURITY) == 0) { + *ctx = grpc_fake_server_security_context_create(); + status = GRPC_SECURITY_OK; + } + return status; +} diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c index e180cad52b..08137803a3 100644 --- a/src/core/security/security_context.c +++ b/src/core/security/security_context.c @@ -165,6 +165,16 @@ static int check_request_metadata_creds(grpc_credentials *creds) { return 1; } +static grpc_mdctx *get_or_create_mdctx(grpc_credentials *creds) { + grpc_mdctx *mdctx = grpc_credentials_get_metadata_context(creds); + if (mdctx == NULL) { + mdctx = grpc_mdctx_create(); + } else { + grpc_mdctx_ref(mdctx); + } + return mdctx; +} + /* -- Fake implementation. -- */ typedef struct { @@ -626,7 +636,8 @@ grpc_channel *grpc_ssl_channel_create(grpc_credentials *ssl_creds, arg.key = GRPC_ARG_HTTP2_SCHEME; arg.value.string = "https"; new_args = grpc_channel_args_copy_and_add(args, &arg); - channel = grpc_secure_channel_create_internal(target, new_args, ctx); + channel = grpc_secure_channel_create_internal( + target, new_args, ctx, get_or_create_mdctx(request_metadata_creds)); grpc_security_context_unref(&ctx->base); grpc_channel_args_destroy(new_args); return channel; @@ -637,8 +648,8 @@ grpc_channel *grpc_fake_transport_security_channel_create( const char *target, const grpc_channel_args *args) { grpc_channel_security_context *ctx = grpc_fake_channel_security_context_create(request_metadata_creds, 1); - grpc_channel *channel = - grpc_secure_channel_create_internal(target, args, ctx); + grpc_channel *channel = grpc_secure_channel_create_internal( + target, args, ctx, get_or_create_mdctx(request_metadata_creds)); grpc_security_context_unref(&ctx->base); return channel; } diff --git a/src/core/security/security_context.h b/src/core/security/security_context.h index 0b5821c3c0..8e7ba34cac 100644 --- a/src/core/security/security_context.h +++ b/src/core/security/security_context.h @@ -190,7 +190,7 @@ grpc_channel *grpc_fake_transport_security_channel_create( grpc_channel *grpc_secure_channel_create_internal( const char *target, const grpc_channel_args *args, - grpc_channel_security_context *ctx); + grpc_channel_security_context *ctx, grpc_mdctx *mdctx); typedef grpc_channel *(*grpc_secure_channel_factory_func)( grpc_credentials *transport_security_creds, @@ -206,10 +206,9 @@ grpc_channel *grpc_secure_channel_create_with_factories( const grpc_secure_channel_factory *factories, size_t num_factories, grpc_credentials *creds, const char *target, const grpc_channel_args *args); -/* Secure server creation. */ +/* Secure server context creation. */ -grpc_server *grpc_secure_server_create_internal(grpc_completion_queue *cq, - const grpc_channel_args *args, - grpc_security_context *ctx); +grpc_security_status grpc_server_security_context_create( + grpc_server_credentials *creds, grpc_security_context **ctx); #endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */ diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 081272724c..165ed5474f 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -141,16 +141,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, /* create security context */ if (creds == NULL) goto error; - - if (strcmp(creds->type, GRPC_CREDENTIALS_TYPE_SSL) == 0) { - status = grpc_ssl_server_security_context_create( - grpc_ssl_server_credentials_get_config(creds), &ctx); - } else if (strcmp(creds->type, - GRPC_CREDENTIALS_TYPE_FAKE_TRANSPORT_SECURITY) == 0) { - ctx = grpc_fake_server_security_context_create(); - status = GRPC_SECURITY_OK; - } - + status = grpc_server_security_context_create(creds, &ctx); if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Unable to create secure server with credentials of type %s.", diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index b280e4bd02..3b1daa07c5 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -38,21 +38,34 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -/* initial allocation size (# of slices) */ -#define INITIAL_CAPACITY 4 -/* grow a buffer; requires INITIAL_CAPACITY > 1 */ +/* grow a buffer; requires GRPC_SLICE_BUFFER_INLINE_ELEMENTS > 1 */ #define GROW(x) (3 * (x) / 2) +static void maybe_embiggen(gpr_slice_buffer *sb) { + if (sb->count == sb->capacity) { + sb->capacity = GROW(sb->capacity); + GPR_ASSERT(sb->capacity > sb->count); + if (sb->slices == sb->inlined) { + sb->slices = gpr_malloc(sb->capacity * sizeof(gpr_slice)); + memcpy(sb->slices, sb->inlined, sb->count * sizeof(gpr_slice)); + } else { + sb->slices = gpr_realloc(sb->slices, sb->capacity * sizeof(gpr_slice)); + } + } +} + void gpr_slice_buffer_init(gpr_slice_buffer *sb) { sb->count = 0; sb->length = 0; - sb->capacity = INITIAL_CAPACITY; - sb->slices = gpr_malloc(sizeof(gpr_slice) * INITIAL_CAPACITY); + sb->capacity = GRPC_SLICE_BUFFER_INLINE_ELEMENTS; + sb->slices = sb->inlined; } void gpr_slice_buffer_destroy(gpr_slice_buffer *sb) { gpr_slice_buffer_reset_and_unref(sb); - gpr_free(sb->slices); + if (sb->slices != sb->inlined) { + gpr_free(sb->slices); + } } gpr_uint8 *gpr_slice_buffer_tiny_add(gpr_slice_buffer *sb, unsigned n) { @@ -71,11 +84,7 @@ gpr_uint8 *gpr_slice_buffer_tiny_add(gpr_slice_buffer *sb, unsigned n) { return out; add_new: - if (sb->count == sb->capacity) { - sb->capacity = GROW(sb->capacity); - GPR_ASSERT(sb->capacity > sb->count); - sb->slices = gpr_realloc(sb->slices, sb->capacity * sizeof(gpr_slice)); - } + maybe_embiggen(sb); back = &sb->slices[sb->count]; sb->count++; back->refcount = NULL; @@ -85,11 +94,7 @@ add_new: size_t gpr_slice_buffer_add_indexed(gpr_slice_buffer *sb, gpr_slice s) { size_t out = sb->count; - if (out == sb->capacity) { - sb->capacity = GROW(sb->capacity); - GPR_ASSERT(sb->capacity > sb->count); - sb->slices = gpr_realloc(sb->slices, sb->capacity * sizeof(gpr_slice)); - } + maybe_embiggen(sb); sb->slices[out] = s; sb->length += GPR_SLICE_LENGTH(s); sb->count = out + 1; @@ -116,12 +121,7 @@ void gpr_slice_buffer_add(gpr_slice_buffer *sb, gpr_slice s) { memcpy(back->data.inlined.bytes + back->data.inlined.length, s.data.inlined.bytes, cp1); back->data.inlined.length = GPR_SLICE_INLINED_SIZE; - if (n == sb->capacity) { - sb->capacity = GROW(sb->capacity); - GPR_ASSERT(sb->capacity > sb->count); - sb->slices = - gpr_realloc(sb->slices, sb->capacity * sizeof(gpr_slice)); - } + maybe_embiggen(sb); back = &sb->slices[n]; sb->count = n + 1; back->refcount = NULL; @@ -160,3 +160,16 @@ void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb) { sb->count = 0; sb->length = 0; } + +void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b) { + gpr_slice_buffer temp = *a; + *a = *b; + *b = temp; + + if (a->slices == b->inlined) { + a->slices = a->inlined; + } + if (b->slices == a->inlined) { + b->slices = b->inlined; + } +} diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index f221cb5790..539470bccf 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -39,6 +39,7 @@ #include <grpc/support/time.h> #include <sys/timeb.h> +#include <windows.h> gpr_timespec gpr_now(void) { gpr_timespec now_tv; @@ -49,4 +50,23 @@ gpr_timespec gpr_now(void) { return now_tv; } +void gpr_sleep_until(gpr_timespec until) { + gpr_timespec now; + gpr_timespec delta; + DWORD sleep_millis; + + for (;;) { + /* We could simplify by using clock_nanosleep instead, but it might be + * slightly less portable. */ + now = gpr_now(); + if (gpr_time_cmp(until, now) <= 0) { + return; + } + + delta = gpr_time_sub(until, now); + sleep_millis = delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; + Sleep(sleep_millis); + } +} + #endif /* GPR_WIN32 */ diff --git a/src/core/surface/init.c b/src/core/surface/init.c index d4f0eb40e8..4de51a666f 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -32,10 +32,11 @@ */ #include <grpc/grpc.h> -#include "src/core/iomgr/iomgr.h" +#include "src/core/channel/channel_stack.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/iomgr.h" #include "src/core/statistics/census_interface.h" -#include "src/core/channel/channel_stack.h" +#include "src/core/profiling/timers.h" #include "src/core/surface/call.h" #include "src/core/surface/init.h" #include "src/core/surface/surface_trace.h" @@ -63,6 +64,7 @@ void grpc_init(void) { grpc_tracer_init("GRPC_TRACE"); grpc_iomgr_init(); census_init(); + grpc_timers_log_global_init(); } gpr_mu_unlock(&g_init_mu); } @@ -72,6 +74,7 @@ void grpc_shutdown(void) { if (--g_initializations == 0) { grpc_iomgr_shutdown(); census_shutdown(); + grpc_timers_log_global_destroy(); } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 8e56868d42..96b2fe04fa 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -205,12 +205,11 @@ static grpc_transport_setup_result complete_setup(void *channel_stack, - perform handshakes */ grpc_channel *grpc_secure_channel_create_internal( const char *target, const grpc_channel_args *args, - grpc_channel_security_context *context) { + grpc_channel_security_context *context, grpc_mdctx *mdctx) { setup *s; grpc_channel *channel; grpc_arg context_arg; grpc_channel_args *args_copy; - grpc_mdctx *mdctx = grpc_mdctx_create(); #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; int n = 0; diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 79cce553fa..708bb06c7f 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -171,13 +171,15 @@ static gpr_uint8 *add_tiny_header_data(framer_state *st, int len) { return gpr_slice_buffer_tiny_add(st->output, len); } -static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { +/* add an element to the decoder table: returns metadata element to unref */ +static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, + grpc_mdelem *elem) { gpr_uint32 key_hash = elem->key->hash; gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1; gpr_uint32 elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) + GPR_SLICE_LENGTH(elem->value->slice); - int drop_ref; + grpc_mdelem *elem_to_unref; /* Reserve space for this element in the remote table: if this overflows the current table, drop elements until it fits, matching the decompressor @@ -204,34 +206,32 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) { /* already there: update with new index */ c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - drop_ref = 1; + elem_to_unref = elem; } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) { /* already there (cuckoo): update with new index */ c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - drop_ref = 1; + elem_to_unref = elem; } else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) { /* not there, but a free element: add */ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - drop_ref = 0; + elem_to_unref = NULL; } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) { /* not there (cuckoo), but a free element: add */ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - drop_ref = 0; + elem_to_unref = NULL; } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] < c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) { /* not there: replace oldest */ - grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); + elem_to_unref = c->entries_elems[HASH_FRAGMENT_2(elem_hash)]; c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - drop_ref = 0; } else { /* not there: replace oldest */ - grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); + elem_to_unref = c->entries_elems[HASH_FRAGMENT_3(elem_hash)]; c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - drop_ref = 0; } /* do exactly the same for the key (so we can find by that again too) */ @@ -257,9 +257,7 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } - if (drop_ref) { - grpc_mdelem_unref(elem); - } + return elem_to_unref; } static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index, @@ -348,9 +346,9 @@ static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 index) { c->table_elems - index; } -/* encode an mdelem, taking ownership of it */ -static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, - framer_state *st) { +/* encode an mdelem; returns metadata element to unref */ +static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, + grpc_mdelem *elem, framer_state *st) { gpr_uint32 key_hash = elem->key->hash; gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); size_t decoder_space_usage; @@ -366,8 +364,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: complete element (first cuckoo hash) */ emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st); - grpc_mdelem_unref(elem); - return; + return elem; } if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem && @@ -375,8 +372,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: complete element (second cuckoo hash) */ emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st); - grpc_mdelem_unref(elem); - return; + return elem; } /* should this elem be in the table? */ @@ -394,12 +390,12 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - add_elem(c, elem); + return add_elem(c, elem); } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); - grpc_mdelem_unref(elem); + return elem; } - return; + abort(); } indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)]; @@ -408,23 +404,24 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - add_elem(c, elem); + return add_elem(c, elem); } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); - grpc_mdelem_unref(elem); + return elem; } - return; + abort(); } /* no elem, key in the table... fall back to literal emission */ if (should_add_elem) { emit_lithdr_incidx_v(c, elem, st); - add_elem(c, elem); + return add_elem(c, elem); } else { emit_lithdr_noidx_v(c, elem, st); - grpc_mdelem_unref(elem); + return elem; } + abort(); } #define STRLEN_LIT(x) (sizeof(x) - 1) @@ -433,11 +430,13 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, framer_state *st) { char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; + grpc_mdelem *mdelem; grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str); - hpack_enc(c, grpc_mdelem_from_metadata_strings( - c->mdctx, grpc_mdstr_ref(c->timeout_key_str), - grpc_mdstr_from_string(c->mdctx, timeout_str)), - st); + mdelem = grpc_mdelem_from_metadata_strings( + c->mdctx, grpc_mdstr_ref(c->timeout_key_str), + grpc_mdstr_from_string(c->mdctx, timeout_str)); + mdelem = hpack_enc(c, mdelem, st); + if (mdelem) grpc_mdelem_unref(mdelem); } gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) { @@ -542,6 +541,9 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, grpc_stream_op *op; gpr_uint32 max_take_size; gpr_uint32 curop = 0; + gpr_uint32 unref_op; + grpc_mdctx *mdctx = compressor->mdctx; + int need_unref = 0; GPR_ASSERT(stream_id != 0); @@ -564,7 +566,12 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, curop++; break; case GRPC_OP_METADATA: - hpack_enc(compressor, op->data.metadata, &st); + /* Encode a metadata element; store the returned value, representing + a metadata element that needs to be unreffed back into the metadata + slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got + updated). After this loop, we'll do a batch unref of elements. */ + op->data.metadata = hpack_enc(compressor, op->data.metadata, &st); + need_unref |= op->data.metadata != NULL; curop++; break; case GRPC_OP_DEADLINE: @@ -601,4 +608,15 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, begin_frame(&st, DATA); } finish_frame(&st, 1, eof); + + if (need_unref) { + grpc_mdctx_lock(mdctx); + for (unref_op = 0; unref_op < curop; unref_op++) { + op = &ops[unref_op]; + if (op->type != GRPC_OP_METADATA) continue; + if (!op->data.metadata) continue; + grpc_mdctx_locked_mdelem_unref(mdctx, op->data.metadata); + } + grpc_mdctx_unlock(mdctx); + } } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 4c0394d46f..110a4b544f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -834,13 +834,10 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id, static int prepare_write(transport *t) { stream *s; - gpr_slice_buffer tempbuf; gpr_uint32 window_delta; /* simple writes are queued to qbuf, and flushed here */ - tempbuf = t->qbuf; - t->qbuf = t->outbuf; - t->outbuf = tempbuf; + gpr_slice_buffer_swap(&t->qbuf, &t->outbuf); GPR_ASSERT(t->qbuf.count == 0); if (t->dirtied_local_settings && !t->sent_local_settings) { diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 066cc263a1..44f6591c95 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -34,10 +34,12 @@ #include "src/core/iomgr/sockaddr.h" #include "src/core/transport/metadata.h" +#include <assert.h> #include <stddef.h> #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/atm.h> #include <grpc/support/log.h> #include "src/core/support/murmur_hash.h" #include "src/core/transport/chttp2/bin_encoder.h" @@ -68,11 +70,12 @@ typedef struct internal_metadata { internal_string *key; internal_string *value; + gpr_atm refcnt; + /* private only data */ void *user_data; void (*destroy_user_data)(void *user_data); - gpr_uint32 refs; grpc_mdctx *context; struct internal_metadata *bucket_next; } internal_metadata; @@ -129,8 +132,8 @@ static void unlock(grpc_mdctx *ctx) { gpr_mu_unlock(&ctx->mu); } -static void ref_md(internal_metadata *md) { - if (0 == md->refs++) { +static void ref_md_locked(internal_metadata *md) { + if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 1)) { md->context->mdtab_free--; } } @@ -168,7 +171,7 @@ static void discard_metadata(grpc_mdctx *ctx) { for (i = 0; i < ctx->mdtab_capacity; i++) { cur = ctx->mdtab[i]; while (cur) { - GPR_ASSERT(cur->refs == 0); + GPR_ASSERT(gpr_atm_acq_load(&cur->refcnt) == 0); next = cur->bucket_next; internal_string_unref(cur->key); internal_string_unref(cur->value); @@ -349,7 +352,7 @@ static void gc_mdtab(grpc_mdctx *ctx) { prev_next = &ctx->mdtab[i]; for (md = ctx->mdtab[i]; md; md = next) { next = md->bucket_next; - if (md->refs == 0) { + if (gpr_atm_acq_load(&md->refcnt) == 0) { internal_string_unref(md->key); internal_string_unref(md->value); if (md->user_data) { @@ -415,7 +418,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, /* search for an existing pair */ for (md = ctx->mdtab[hash % ctx->mdtab_capacity]; md; md = md->bucket_next) { if (md->key == key && md->value == value) { - ref_md(md); + ref_md_locked(md); internal_string_unref(key); internal_string_unref(value); unlock(ctx); @@ -425,7 +428,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, /* not found: create a new pair */ md = gpr_malloc(sizeof(internal_metadata)); - md->refs = 1; + gpr_atm_rel_store(&md->refcnt, 1); md->context = ctx; md->key = key; md->value = value; @@ -468,10 +471,12 @@ grpc_mdelem *grpc_mdelem_from_string_and_buffer(grpc_mdctx *ctx, grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd) { internal_metadata *md = (internal_metadata *)gmd; - grpc_mdctx *ctx = md->context; - lock(ctx); - ref_md(md); - unlock(ctx); + /* we can assume the ref count is >= 1 as the application is calling + this function - meaning that no adjustment to mdtab_free is necessary, + simplifying the logic here to be just an atomic increment */ + /* use C assert to have this removed in opt builds */ + assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); + gpr_atm_no_barrier_fetch_add(&md->refcnt, 1); return gmd; } @@ -479,8 +484,8 @@ void grpc_mdelem_unref(grpc_mdelem *gmd) { internal_metadata *md = (internal_metadata *)gmd; grpc_mdctx *ctx = md->context; lock(ctx); - GPR_ASSERT(md->refs); - if (0 == --md->refs) { + assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); + if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { ctx->mdtab_free++; } unlock(ctx); @@ -550,3 +555,17 @@ gpr_slice grpc_mdstr_as_base64_encoded_and_huffman_compressed(grpc_mdstr *gs) { unlock(ctx); return slice; } + +void grpc_mdctx_lock(grpc_mdctx *ctx) { lock(ctx); } + +void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *gmd) { + internal_metadata *md = (internal_metadata *)gmd; + grpc_mdctx *elem_ctx = md->context; + GPR_ASSERT(ctx == elem_ctx); + assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); + if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { + ctx->mdtab_free++; + } +} + +void grpc_mdctx_unlock(grpc_mdctx *ctx) { unlock(ctx); } diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h index b8afbeb1e3..21b8ae2b78 100644 --- a/src/core/transport/metadata.h +++ b/src/core/transport/metadata.h @@ -135,6 +135,18 @@ void grpc_mdelem_unref(grpc_mdelem *md); Does not promise that the returned string has no embedded nulls however. */ const char *grpc_mdstr_as_c_string(grpc_mdstr *s); +/* Batch mode metadata functions. + These API's have equivalents above, but allow taking the mdctx just once, + performing a bunch of work, and then leaving the mdctx. */ + +/* Lock the metadata context: it's only safe to call _locked_ functions against + this context from the calling thread until grpc_mdctx_unlock is called */ +void grpc_mdctx_lock(grpc_mdctx *ctx); +/* Unref a metadata element */ +void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem); +/* Unlock the metadata context */ +void grpc_mdctx_unlock(grpc_mdctx *ctx); + #define GRPC_MDSTR_KV_HASH(k_hash, v_hash) (GPR_ROTL((k_hash), 2) ^ (v_hash)) #endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */ diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js index ae548c89e4..3fac193d64 100644 --- a/src/node/examples/math_server.js +++ b/src/node/examples/math_server.js @@ -33,10 +33,6 @@ 'use strict'; -var util = require('util'); - -var Transform = require('stream').Transform; - var grpc = require('..'); var math = grpc.load(__dirname + '/math.proto').math; @@ -54,11 +50,12 @@ function mathDiv(call, cb) { // Unary + is explicit coersion to integer if (+req.divisor === 0) { cb(new Error('cannot divide by zero')); + } else { + cb(null, { + quotient: req.dividend / req.divisor, + remainder: req.dividend % req.divisor + }); } - cb(null, { - quotient: req.dividend / req.divisor, - remainder: req.dividend % req.divisor - }); } /** @@ -97,24 +94,19 @@ function mathSum(call, cb) { } function mathDivMany(stream) { - // Here, call is a standard duplex Node object Stream - util.inherits(DivTransform, Transform); - function DivTransform() { - var options = {objectMode: true}; - Transform.call(this, options); - } - DivTransform.prototype._transform = function(div_args, encoding, callback) { + stream.on('data', function(div_args) { if (+div_args.divisor === 0) { - callback(new Error('cannot divide by zero')); + stream.emit('error', new Error('cannot divide by zero')); + } else { + stream.write({ + quotient: div_args.dividend / div_args.divisor, + remainder: div_args.dividend % div_args.divisor + }); } - callback(null, { - quotient: div_args.dividend / div_args.divisor, - remainder: div_args.dividend % div_args.divisor - }); - }; - var transform = new DivTransform(); - stream.pipe(transform); - transform.pipe(stream); + }); + stream.on('end', function() { + stream.end(); + }); } var server = new Server({ diff --git a/src/node/package.json b/src/node/package.json index 9f52f8c988..fc3ca1f103 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -1,6 +1,6 @@ { "name": "grpc", - "version": "0.6.0", + "version": "0.6.1", "author": "Google Inc.", "description": "gRPC Library for Node", "homepage": "http://www.grpc.io/", diff --git a/src/node/src/server.js b/src/node/src/server.js index 05de16294d..eef705c44c 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -360,7 +360,9 @@ function handleUnary(call, handler, metadata) { } handler.func(emitter, function sendUnaryData(err, value, trailer) { if (err) { - err.metadata = trailer; + if (trailer) { + err.metadata = trailer; + } handleError(call, err); } else { sendUnaryResponse(call, value, handler.serialize, trailer); @@ -406,7 +408,9 @@ function handleClientStreaming(call, handler, metadata) { handler.func(stream, function(err, value, trailer) { stream.terminate(); if (err) { - err.metadata = trailer; + if (trailer) { + err.metadata = trailer; + } handleError(call, err); } else { sendUnaryResponse(call, value, handler.serialize, trailer); diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index d83f64116f..79df97871b 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -68,6 +68,13 @@ describe('Math client', function() { done(); }); }); + it('should handle an error from a unary request', function(done) { + var arg = {dividend: 7, divisor: 0}; + math_client.div(arg, function handleDivResult(err, value) { + assert(err); + done(); + }); + }); it('should handle a server streaming request', function(done) { var call = math_client.fib({limit: 7}); var expected_results = [1, 1, 2, 3, 5, 8, 13]; @@ -115,4 +122,17 @@ describe('Math client', function() { done(); }); }); + it('should handle an error from a bidi request', function(done) { + var call = math_client.divMany(); + call.on('data', function(value) { + assert.fail(value, undefined, 'Unexpected data response on failing call', + '!='); + }); + call.write({dividend: 7, divisor: 0}); + call.end(); + call.on('status', function checkStatus(status) { + assert.notEqual(status.code, grpc.status.OK); + done(); + }); + }); }); diff --git a/src/python/interop/interop/client.py b/src/python/interop/interop/client.py index bae5e17460..41f0d94539 100644 --- a/src/python/interop/interop/client.py +++ b/src/python/interop/interop/client.py @@ -64,7 +64,7 @@ def _args(): return parser.parse_args() def _oauth_access_token(args): - credentials = client.GoogleCredentials.get_application_default() + credentials = oauth2client_client.GoogleCredentials.get_application_default() scoped_credentials = credentials.create_scoped([args.oauth_scope]) return scoped_credentials.get_access_token().access_token diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py index c69771dff1..909b738bd1 100644 --- a/src/python/interop/interop/methods.py +++ b/src/python/interop/interop/methods.py @@ -292,7 +292,7 @@ def _service_account_creds(stub, args): if wanted_email != response.username: raise ValueError( 'expected username %s, got %s' % (wanted_email, response.username)) - if response.oauth_scope in args.oauth_scope: + if args.oauth_scope.find(response.oauth_scope) == -1: raise ValueError( 'expected to find oauth scope "%s" in received "%s"' % (response.oauth_scope, args.oauth_scope)) diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index 35456d38c6..f3f2a043eb 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -223,7 +223,8 @@ def stub( breakdown = _face_utilities.break_down_invocation(service_name, methods) return _Stub( breakdown, host, port, secure, root_certificates, private_key, - certificate_chain, server_host_override=server_host_override) + certificate_chain, server_host_override=server_host_override, + metadata_transformer=metadata_transformer) def server( diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index d5bb55e5a8..d9fe0a5835 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,42 +1,30 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-01-16 02:30:04 -0800 using RuboCop version 0.28.0. +# on 2015-04-14 09:35:44 -0700 using RuboCop version 0.29.1. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new # versions of RuboCop, may require this file to be generated again. -# Offense count: 3 -# Lint/UselessAssignment: -# Enabled: false - -# Offense count: 33 +# Offense count: 32 Metrics/AbcSize: - Max: 39 + Max: 36 -# Offense count: 3 +# Offense count: 2 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 231 - -# Offense count: 2 -Metrics/CyclomaticComplexity: - Max: 8 + Max: 183 -# Offense count: 36 +# Offense count: 35 # Configuration parameters: CountComments. Metrics/MethodLength: - Max: 37 + Max: 36 -# Offense count: 8 +# Offense count: 7 # Configuration parameters: CountKeywordArgs. Metrics/ParameterLists: Max: 8 -# Offense count: 2 -Metrics/PerceivedComplexity: - Max: 10 - -# Offense count: 7 +# Offense count: 6 # Configuration parameters: AllowedVariables. Style/GlobalVars: Enabled: false @@ -50,3 +38,7 @@ Style/Next: # Configuration parameters: Methods. Style/SingleLineBlockParams: Enabled: false + +# Offense count: 1 +Style/StructInheritance: + Enabled: false diff --git a/src/ruby/CHANGELOG.md b/src/ruby/CHANGELOG.md new file mode 100644 index 0000000000..8ec6e3cfdb --- /dev/null +++ b/src/ruby/CHANGELOG.md @@ -0,0 +1,11 @@ +## 0.6.1 (2015-04-14) + +### Changes + +* Begins this ChangeLog ([@tbetbetbe][]) +* Updates to version 0.4 of googleauth. ([@tbetbetbe][]) +* Switch the extension to use the call API. ([@tbetbetbe][]) +* Refactor the C extension to avoid identifiers used by ruby ([@yugui][]) + +[@tbetbetbe]: https://github.com/tbetbetbe +[@yugui]: https://github.com/yugui diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb index 9bb324ff64..6d69b0f21e 100755 --- a/src/ruby/bin/apis/pubsub_demo.rb +++ b/src/ruby/bin/apis/pubsub_demo.rb @@ -71,7 +71,7 @@ end # Builds the metadata authentication update proc. def auth_proc(opts) - auth_creds = Google::Auth.get_application_default(opts.oauth_scope) + auth_creds = Google::Auth.get_application_default return auth_creds.updater_proc end @@ -213,17 +213,14 @@ class NamedActions end # Args is used to hold the command line info. -Args = Struct.new(:host, :oauth_scope, :port, :action, :project_id, :topic_name, +Args = Struct.new(:host, :port, :action, :project_id, :topic_name, :sub_name) # validates the the command line options, returning them as an Arg. def parse_args args = Args.new('pubsub-staging.googleapis.com', - 'https://www.googleapis.com/auth/pubsub', 443, 'list_some_topics', 'stoked-keyword-656') OptionParser.new do |opts| - opts.on('--oauth_scope scope', - 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v } opts.on('--server_host SERVER_HOST', 'server hostname') do |v| args.host = v end @@ -250,7 +247,7 @@ def parse_args end def _check_args(args) - %w(host port action oauth_scope).each do |a| + %w(host port action).each do |a| if args[a].nil? raise OptionParser::MissingArgument.new("please specify --#{a}") end diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index b2a8711c79..af7a1d5b15 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -110,6 +110,11 @@ def create_stub(opts) end end + if opts.test_case == 'jwt_token_creds' # don't use a scope + auth_creds = Google::Auth.get_application_default + stub_opts[:update_metadata] = auth_creds.updater_proc + end + logger.info("... connecting securely to #{address}") Grpc::Testing::TestService::Stub.new(address, **stub_opts) else @@ -201,6 +206,15 @@ class NamedTests p 'OK: service_account_creds' end + def jwt_token_creds + json_key = File.read(ENV[AUTH_ENV]) + wanted_email = MultiJson.load(json_key)['client_email'] + resp = perform_large_unary(fill_username: true) + assert_equal(wanted_email, resp.username, + 'service_account_creds: incorrect username') + p 'OK: jwt_token_creds' + end + def compute_engine_creds resp = perform_large_unary(fill_username: true, fill_oauth_scope: true) diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index 45cbacfeb0..a50d0351da 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -22,7 +22,7 @@ Gem::Specification.new do |s| s.platform = Gem::Platform::RUBY s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' - s.add_dependency 'googleauth', '~> 0.1' + s.add_dependency 'googleauth', '~> 0.4' s.add_dependency 'logging', '~> 1.8' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_dependency 'xray', '~> 1.1' diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 6547a1499e..dc7672d359 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -52,6 +52,14 @@ module GRPC Core::Channel.new(host, kw, creds) end + def self.update_with_jwt_aud_uri(a_hash, host, method) + last_slash_idx, res = method.rindex('/'), a_hash.clone + return res if last_slash_idx.nil? + service_name = method[0..(last_slash_idx - 1)] + res[:jwt_aud_uri] = "https://#{host}#{service_name}" + res + end + # check_update_metadata is used by #initialize verify that it's a Proc. def self.check_update_metadata(update_metadata) return update_metadata if update_metadata.nil? @@ -147,7 +155,8 @@ module GRPC def request_response(method, req, marshal, unmarshal, timeout = nil, return_op: false, **kw) c = new_active_call(method, marshal, unmarshal, timeout) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.request_response(req, **md) unless return_op # return the operation view of the active_call; define #execute as a @@ -204,7 +213,8 @@ module GRPC def client_streamer(method, requests, marshal, unmarshal, timeout = nil, return_op: false, **kw) c = new_active_call(method, marshal, unmarshal, timeout) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.client_streamer(requests, **md) unless return_op # return the operation view of the active_call; define #execute as a @@ -270,7 +280,8 @@ module GRPC def server_streamer(method, req, marshal, unmarshal, timeout = nil, return_op: false, **kw, &blk) c = new_active_call(method, marshal, unmarshal, timeout) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.server_streamer(req, **md, &blk) unless return_op # return the operation view of the active_call; define #execute @@ -375,7 +386,8 @@ module GRPC def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, return_op: false, **kw, &blk) c = new_active_call(method, marshal, unmarshal, timeout) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.bidi_streamer(requests, **md, &blk) unless return_op # return the operation view of the active_call; define #execute diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index bfd0cbb393..072fb9b1aa 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -29,5 +29,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '0.6.0' + VERSION = '0.6.1' end diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index 31e38d71b8..d471ff5db6 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -58,7 +58,7 @@ describe GRPC::Core::Channel do it 'does not take a hash with bad values as channel args' do blk = construct_with_args(symbol: Object.new) expect(&blk).to raise_error TypeError - blk = construct_with_args('1' => Hash.new) + blk = construct_with_args('1' => {}) expect(&blk).to raise_error TypeError end diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 1a2afbe1f9..68af79f907 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -192,11 +192,11 @@ shared_examples 'GRPC metadata delivery works OK' do describe 'from client => server' do before(:example) do n = 7 # arbitrary number of metadata - diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] } + diff_keys_fn = proc { |i| [format('k%d', i), format('v%d', i)] } diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }] - null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] } + null_vals_fn = proc { |i| [format('k%d', i), format('v\0%d', i)] } null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }] - same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] } + same_keys_fn = proc { |i| [format('k%d', i), [format('v%d', i)] * n] } same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }] symbol_key = { a_key: 'a val' } @valid_metadata = [diff_keys, same_keys, null_vals, symbol_key] @@ -242,11 +242,11 @@ shared_examples 'GRPC metadata delivery works OK' do describe 'from server => client' do before(:example) do n = 7 # arbitrary number of metadata - diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] } + diff_keys_fn = proc { |i| [format('k%d', i), format('v%d', i)] } diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }] - null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] } + null_vals_fn = proc { |i| [format('k%d', i), format('v\0%d', i)] } null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }] - same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] } + same_keys_fn = proc { |i| [format('k%d', i), [format('v%d', i)] * n] } same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }] symbol_key = { a_key: 'a val' } @valid_metadata = [diff_keys, same_keys, null_vals, symbol_key] diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index f409d73e2f..245999ea03 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -400,7 +400,8 @@ describe GRPC::RpcServer do end stub = EchoStub.new(@host, **@client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) - wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2' }] + wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', + 'jwt_aud_uri' => "https://#{@host}/EchoService" }] expect(service.received_md).to eq(wanted_md) @srv.stop t.join diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb index a47e484f97..bb566d1b1f 100644 --- a/src/ruby/spec/server_spec.rb +++ b/src/ruby/spec/server_spec.rb @@ -152,7 +152,7 @@ describe Server do it 'does not take a hash with bad values as channel args' do blk = construct_with_args(symbol: Object.new) expect(&blk).to raise_error TypeError - blk = construct_with_args('1' => Hash.new) + blk = construct_with_args('1' => {}) expect(&blk).to raise_error TypeError end |