From 0e3969874ae731ae2278b29a426dd83d2f95470b Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 30 Aug 2017 16:23:41 -0700 Subject: Allow unreachable code on iPhone --- include/grpc/impl/codegen/port_platform.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index e84a75d295..8d8dcee3b0 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -183,7 +183,7 @@ #define _BSD_SOURCE #endif #if TARGET_OS_IPHONE -#define GPR_FORBID_UNREACHABLE_CODE 1 +#define GPR_FORBID_UNREACHABLE_CODE 0 #define GPR_PLATFORM_STRING "ios" #define GPR_CPU_IPHONE 1 #define GPR_PTHREAD_TLS 1 -- cgit v1.2.3 From 2caf021772ee241da3366e7dfd32aa4ee1134092 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 1 Sep 2017 15:04:13 -0700 Subject: Change plugin credentials API to support both sync and async modes. --- include/grpc/grpc_security.h | 35 ++++- .../credentials/plugin/plugin_credentials.c | 143 ++++++++++++++------- src/cpp/client/secure_credentials.cc | 68 ++++++++-- src/cpp/client/secure_credentials.h | 17 ++- src/csharp/ext/grpc_csharp_ext.c | 8 +- src/node/ext/call_credentials.cc | 10 +- src/php/ext/grpc/call_credentials.c | 45 +++++-- .../grpc/_cython/_cygrpc/credentials.pxd.pxi | 7 +- .../grpc/_cython/_cygrpc/credentials.pyx.pxi | 9 +- src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 7 +- src/ruby/ext/grpc/rb_call_credentials.c | 8 +- test/core/security/credentials_test.c | 43 ++++--- 12 files changed, 280 insertions(+), 120 deletions(-) (limited to 'include') diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h index 2005e25df2..42e2ab3111 100644 --- a/include/grpc/grpc_security.h +++ b/include/grpc/grpc_security.h @@ -249,19 +249,40 @@ typedef struct { void *reserved; } grpc_auth_metadata_context; +/** Maximum number of credentials returnable by a credentials plugin via + a synchronous return. */ +#define GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX 4 + /** grpc_metadata_credentials plugin is an API user provided structure used to create grpc_credentials objects that can be set on a channel (composed) or a call. See grpc_credentials_metadata_create_from_plugin below. The grpc client stack will call the get_metadata method of the plugin for every call in scope for the credentials created from it. */ typedef struct { - /** The implementation of this method has to be non-blocking. - - context is the information that can be used by the plugin to create auth - metadata. - - cb is the callback that needs to be called when the metadata is ready. - - user_data needs to be passed as the first parameter of the callback. */ - void (*get_metadata)(void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data); + /** The implementation of this method has to be non-blocking, but can + be performed synchronously or asynchronously. + + If processing occurs synchronously, returns non-zero and populates + creds_md, num_creds_md, status, and error_details. In this case, + the caller takes ownership of the entries in creds_md and of + error_details. Note that if the plugin needs to return more than + GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX entries in creds_md, it must + return asynchronously. + + If processing occurs asynchronously, returns zero and invokes \a cb + when processing is completed. \a user_data will be passed as the + first parameter of the callback. NOTE: \a cb MUST be invoked in a + different thread, not from the thread in which \a get_metadata() is + invoked. + + \a context is the information that can be used by the plugin to create + auth metadata. */ + int (*get_metadata)( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details); /** Destroys the plugin state. */ void (*destroy)(void *state); diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c index 73e0c23e0f..e844bff79c 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.c +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c @@ -53,6 +53,63 @@ static void pending_request_remove_locked( } } +// Checks if the request has been cancelled. +// If not, removes it from the pending list, so that it cannot be +// cancelled out from under us. +// When this returns, r->cancelled indicates whether the request was +// cancelled before completion. +static void pending_request_complete( + grpc_exec_ctx *exec_ctx, grpc_plugin_credentials_pending_request *r) { + gpr_mu_lock(&r->creds->mu); + if (!r->cancelled) pending_request_remove_locked(r->creds, r); + gpr_mu_unlock(&r->creds->mu); + // Ref to credentials not needed anymore. + grpc_call_credentials_unref(exec_ctx, &r->creds->base); +} + +static grpc_error *process_plugin_result( + grpc_exec_ctx *exec_ctx, grpc_plugin_credentials_pending_request *r, + const grpc_metadata *md, size_t num_md, grpc_status_code status, + const char *error_details) { + grpc_error *error = GRPC_ERROR_NONE; + if (status != GRPC_STATUS_OK) { + char *msg; + gpr_asprintf(&msg, "Getting metadata from plugin failed with error: %s", + error_details); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + } else { + bool seen_illegal_header = false; + for (size_t i = 0; i < num_md; ++i) { + if (!GRPC_LOG_IF_ERROR("validate_metadata_from_plugin", + grpc_validate_header_key_is_legal(md[i].key))) { + seen_illegal_header = true; + break; + } else if (!grpc_is_binary_header(md[i].key) && + !GRPC_LOG_IF_ERROR( + "validate_metadata_from_plugin", + grpc_validate_header_nonbin_value_is_legal( + md[i].value))) { + gpr_log(GPR_ERROR, "Plugin added invalid metadata value."); + seen_illegal_header = true; + break; + } + } + if (seen_illegal_header) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Illegal metadata"); + } else { + for (size_t i = 0; i < num_md; ++i) { + grpc_mdelem mdelem = grpc_mdelem_from_slices( + exec_ctx, grpc_slice_ref_internal(md[i].key), + grpc_slice_ref_internal(md[i].value)); + grpc_credentials_mdelem_array_add(r->md_array, mdelem); + GRPC_MDELEM_UNREF(exec_ctx, mdelem); + } + } + } + return error; +} + static void plugin_md_request_metadata_ready(void *request, const grpc_metadata *md, size_t num_md, @@ -64,54 +121,13 @@ static void plugin_md_request_metadata_ready(void *request, NULL, NULL); grpc_plugin_credentials_pending_request *r = (grpc_plugin_credentials_pending_request *)request; - // Check if the request has been cancelled. - // If not, remove it from the pending list, so that it cannot be - // cancelled out from under us. - gpr_mu_lock(&r->creds->mu); - if (!r->cancelled) pending_request_remove_locked(r->creds, r); - gpr_mu_unlock(&r->creds->mu); - grpc_call_credentials_unref(&exec_ctx, &r->creds->base); + // Remove request from pending list if not previously cancelled. + pending_request_complete(&exec_ctx, r); // If it has not been cancelled, process it. if (!r->cancelled) { - if (status != GRPC_STATUS_OK) { - char *msg; - gpr_asprintf(&msg, "Getting metadata from plugin failed with error: %s", - error_details); - GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata, - GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg)); - gpr_free(msg); - } else { - bool seen_illegal_header = false; - for (size_t i = 0; i < num_md; ++i) { - if (!GRPC_LOG_IF_ERROR("validate_metadata_from_plugin", - grpc_validate_header_key_is_legal(md[i].key))) { - seen_illegal_header = true; - break; - } else if (!grpc_is_binary_header(md[i].key) && - !GRPC_LOG_IF_ERROR( - "validate_metadata_from_plugin", - grpc_validate_header_nonbin_value_is_legal( - md[i].value))) { - gpr_log(GPR_ERROR, "Plugin added invalid metadata value."); - seen_illegal_header = true; - break; - } - } - if (seen_illegal_header) { - GRPC_CLOSURE_SCHED( - &exec_ctx, r->on_request_metadata, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Illegal metadata")); - } else { - for (size_t i = 0; i < num_md; ++i) { - grpc_mdelem mdelem = grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_ref_internal(md[i].key), - grpc_slice_ref_internal(md[i].value)); - grpc_credentials_mdelem_array_add(r->md_array, mdelem); - GRPC_MDELEM_UNREF(&exec_ctx, mdelem); - } - GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata, GRPC_ERROR_NONE); - } - } + grpc_error *error = + process_plugin_result(&exec_ctx, r, md, num_md, status, error_details); + GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata, error); } gpr_free(r); grpc_exec_ctx_finish(&exec_ctx); @@ -125,6 +141,7 @@ static bool plugin_get_request_metadata(grpc_exec_ctx *exec_ctx, grpc_closure *on_request_metadata, grpc_error **error) { grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; + bool retval = true; // Synchronous return. if (c->plugin.get_metadata != NULL) { // Create pending_request object. grpc_plugin_credentials_pending_request *pending_request = @@ -143,11 +160,37 @@ static bool plugin_get_request_metadata(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&c->mu); // Invoke the plugin. The callback holds a ref to us. grpc_call_credentials_ref(creds); - c->plugin.get_metadata(c->plugin.state, context, - plugin_md_request_metadata_ready, pending_request); - return false; + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX]; + size_t num_creds_md = 0; + grpc_status_code status = GRPC_STATUS_OK; + const char *error_details = NULL; + if (!c->plugin.get_metadata(c->plugin.state, context, + plugin_md_request_metadata_ready, + pending_request, creds_md, &num_creds_md, + &status, &error_details)) { + return false; // Asynchronous return. + } + // Returned synchronously. + // Remove request from pending list if not previously cancelled. + pending_request_complete(exec_ctx, pending_request); + // If the request was cancelled, the error will have been returned + // asynchronously by plugin_cancel_get_request_metadata(), so return + // false. Otherwise, process the result. + if (pending_request->cancelled) { + retval = false; + } else { + *error = process_plugin_result(exec_ctx, pending_request, creds_md, + num_creds_md, status, error_details); + } + // Clean up. + for (size_t i = 0; i < num_creds_md; ++i) { + grpc_slice_unref_internal(exec_ctx, creds_md[i].key); + grpc_slice_unref_internal(exec_ctx, creds_md[i].value); + } + gpr_free((void *)error_details); + gpr_free(pending_request); } - return true; + return retval; } static void plugin_cancel_get_request_metadata( diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 057a058a3f..00104165a5 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/common/secure_auth_context.h" @@ -157,28 +158,50 @@ void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) { delete w; } -void MetadataCredentialsPluginWrapper::GetMetadata( +int MetadataCredentialsPluginWrapper::GetMetadata( void* wrapper, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void* user_data) { + grpc_credentials_plugin_metadata_cb cb, void* user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) { GPR_ASSERT(wrapper); MetadataCredentialsPluginWrapper* w = reinterpret_cast(wrapper); if (!w->plugin_) { - cb(user_data, NULL, 0, GRPC_STATUS_OK, NULL); - return; + *num_creds_md = 0; + *status = GRPC_STATUS_OK; + *error_details = nullptr; + return true; } if (w->plugin_->IsBlocking()) { + // Asynchronous return. w->thread_pool_->Add( std::bind(&MetadataCredentialsPluginWrapper::InvokePlugin, w, context, - cb, user_data)); + cb, user_data, nullptr, nullptr, nullptr, nullptr)); + return false; } else { - w->InvokePlugin(context, cb, user_data); + // Synchronous return. + w->InvokePlugin(context, cb, user_data, creds_md, num_creds_md, status, + error_details); + return true; } } +namespace { + +void UnrefMetadata(const std::vector& md) { + for (auto it = md.begin(); it != md.end(); ++it) { + grpc_slice_unref(it->key); + grpc_slice_unref(it->value); + } +} + +} // namespace + void MetadataCredentialsPluginWrapper::InvokePlugin( grpc_auth_metadata_context context, grpc_credentials_plugin_metadata_cb cb, - void* user_data) { + void* user_data, grpc_metadata creds_md[4], size_t *num_creds_md, + grpc_status_code *status_code, const char **error_details) { std::multimap metadata; // const_cast is safe since the SecureAuthContext does not take owndership and @@ -196,12 +219,31 @@ void MetadataCredentialsPluginWrapper::InvokePlugin( md_entry.flags = 0; md.push_back(md_entry); } - cb(user_data, md.empty() ? nullptr : &md[0], md.size(), - static_cast(status.error_code()), - status.error_message().c_str()); - for (auto it = md.begin(); it != md.end(); ++it) { - grpc_slice_unref(it->key); - grpc_slice_unref(it->value); + if (creds_md != nullptr) { + // Synchronous return. + if (md.size() > GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX) { + *num_creds_md = 0; + *status_code = GRPC_STATUS_INTERNAL; + *error_details = gpr_strdup( + "blocking plugin credentials returned too many metadata keys"); + UnrefMetadata(md); + } else { + for (const auto& elem : md) { + creds_md[*num_creds_md].key = elem.key; + creds_md[*num_creds_md].value = elem.value; + creds_md[*num_creds_md].flags = elem.flags; + ++(*num_creds_md); + } + *status_code = static_cast(status.error_code()); + *error_details = + status.ok() ? nullptr : gpr_strdup(status.error_message().c_str()); + } + } else { + // Asynchronous return. + cb(user_data, md.empty() ? nullptr : &md[0], md.size(), + static_cast(status.error_code()), + status.error_message().c_str()); + UnrefMetadata(md); } } diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h index 66547c736b..fdabaca84f 100644 --- a/src/cpp/client/secure_credentials.h +++ b/src/cpp/client/secure_credentials.h @@ -58,16 +58,23 @@ class SecureCallCredentials final : public CallCredentials { class MetadataCredentialsPluginWrapper final : private GrpcLibraryCodegen { public: static void Destroy(void* wrapper); - static void GetMetadata(void* wrapper, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, - void* user_data); + static int GetMetadata( + void* wrapper, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void* user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details); explicit MetadataCredentialsPluginWrapper( std::unique_ptr plugin); private: - void InvokePlugin(grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void* user_data); + void InvokePlugin( + grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void* user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details); std::unique_ptr thread_pool_; std::unique_ptr plugin_; }; diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index aebce364c5..b2c8ca63f4 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -1023,13 +1023,17 @@ typedef void(GPR_CALLTYPE *grpcsharp_metadata_interceptor_func)( grpc_credentials_plugin_metadata_cb cb, void *user_data, int32_t is_destroy); -static void grpcsharp_get_metadata_handler( +static int grpcsharp_get_metadata_handler( void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data) { + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) { grpcsharp_metadata_interceptor_func interceptor = (grpcsharp_metadata_interceptor_func)(intptr_t)state; interceptor(state, context.service_url, context.method_name, cb, user_data, 0); + return 0; /* Asynchronous return. */ } static void grpcsharp_metadata_credentials_destroy_handler(void *state) { diff --git a/src/node/ext/call_credentials.cc b/src/node/ext/call_credentials.cc index 4cf3e565ef..0644a812e9 100644 --- a/src/node/ext/call_credentials.cc +++ b/src/node/ext/call_credentials.cc @@ -238,9 +238,12 @@ NAUV_WORK_CB(SendPluginCallback) { } } -void plugin_get_metadata(void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, - void *user_data) { +int plugin_get_metadata( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) { plugin_state *p_state = reinterpret_cast(state); plugin_callback_data *data = new plugin_callback_data; data->service_url = context.service_url; @@ -252,6 +255,7 @@ void plugin_get_metadata(void *state, grpc_auth_metadata_context context, uv_mutex_unlock(&p_state->plugin_mutex); uv_async_send(&p_state->plugin_async); + return 0; // Async processing. } void plugin_uv_close_cb(uv_handle_t *handle) { diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index 1eee8645df..8bf5120538 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -143,9 +143,12 @@ PHP_METHOD(CallCredentials, createFromPlugin) { } /* Callback function for plugin creds API */ -void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, - void *user_data) { +int plugin_get_metadata( + void *ptr, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) { TSRMLS_FETCH(); plugin_state *state = (plugin_state *)ptr; @@ -175,15 +178,19 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, /* call the user callback function */ zend_call_function(state->fci, state->fci_cache TSRMLS_CC); - grpc_status_code code = GRPC_STATUS_OK; + *num_creds_md = 0; + *status = GRPC_STATUS_OK; + *error_details = NULL; + grpc_metadata_array metadata; - bool cleanup = true; if (retval == NULL || Z_TYPE_P(retval) != IS_ARRAY) { - cleanup = false; - code = GRPC_STATUS_INVALID_ARGUMENT; - } else if (!create_metadata_array(retval, &metadata)) { - code = GRPC_STATUS_INVALID_ARGUMENT; + *status = GRPC_STATUS_INVALID_ARGUMENT; + return true; // Synchronous return. + } + if (!create_metadata_array(retval, &metadata)) { + *status = GRPC_STATUS_INVALID_ARGUMENT; + return true; // Synchronous return. } if (retval != NULL) { @@ -197,14 +204,24 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, #endif } - /* Pass control back to core */ - cb(user_data, metadata.metadata, metadata.count, code, NULL); - if (cleanup) { - for (int i = 0; i < metadata.count; i++) { + if (metadata.count > GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX) { + *status = GRPC_STATUS_INTERNAL; + *error_details = gpr_strdup( + "PHP plugin credentials returned too many metadata entries"); + for (size_t i = 0; i < metadata.count; i++) { + // TODO(stanleycheung): Why don't we need to unref the key here? grpc_slice_unref(metadata.metadata[i].value); } - grpc_metadata_array_destroy(&metadata); + } else { + // Return data to core. + *num_creds_md = metadata.count; + for (size_t i = 0; i < metadata.count; ++i) { + creds_md[i] = metadata.metadata[i]; + } } + + grpc_metadata_array_destroy(&metadata); + return true; // Synchronous return. } /* Cleanup function for plugin creds API */ diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi index a0e69dd613..98f306feb7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi @@ -49,8 +49,11 @@ cdef class AuthMetadataContext: cdef grpc_auth_metadata_context context -cdef void plugin_get_metadata( +cdef int plugin_get_metadata( void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data) with gil + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) with gil cdef void plugin_destroy_c_plugin_state(void *state) with gil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 98d7a9820d..36ae151b9d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -122,9 +122,13 @@ cdef class AuthMetadataContext: grpc_shutdown() -cdef void plugin_get_metadata( +cdef int plugin_get_metadata( void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data) with gil: + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) with gil: +# FIXME: force the python code to run in a separate thread called_flag = [False] def python_callback( Metadata metadata, grpc_status_code status, @@ -141,6 +145,7 @@ cdef void plugin_get_metadata( if not called_flag[0]: cb(user_data, Metadata([]).c_metadata_array.metadata, 0, StatusCode.unknown, traceback.format_exc().encode()) + return 0 # Asynchronous return cdef void plugin_destroy_c_plugin_state(void *state) with gil: cpython.Py_DECREF(state) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 5950bfa0e6..7ca8fd3df0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -461,9 +461,12 @@ cdef extern from "grpc/grpc_security.h": grpc_status_code status, const char *error_details) ctypedef struct grpc_metadata_credentials_plugin: - void (*get_metadata)( + int (*get_metadata)( void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data) + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) void (*destroy)(void *state) void *state const char *type diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 049a869bdc..4214a0811b 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -112,9 +112,12 @@ static void grpc_rb_call_credentials_callback_with_gil(void *param) { gpr_free(params); } -static void grpc_rb_call_credentials_plugin_get_metadata( +static int grpc_rb_call_credentials_plugin_get_metadata( void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data) { + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) { callback_params *params = gpr_malloc(sizeof(callback_params)); params->get_metadata = (VALUE)state; params->context = context; @@ -123,6 +126,7 @@ static void grpc_rb_call_credentials_plugin_get_metadata( grpc_rb_event_queue_enqueue(grpc_rb_call_credentials_callback_with_gil, (void *)(params)); + return 0; // Async return. } static void grpc_rb_call_credentials_plugin_destroy(void *state) { diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index 441c431135..5ac58070c8 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -1036,39 +1036,46 @@ typedef enum { static const expected_md plugin_md[] = {{"foo", "bar"}, {"hi", "there"}}; -static void plugin_get_metadata_success(void *state, - grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, - void *user_data) { - size_t i; - grpc_metadata md[GPR_ARRAY_SIZE(plugin_md)]; - plugin_state *s = (plugin_state *)state; +static int plugin_get_metadata_success( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) { GPR_ASSERT(strcmp(context.service_url, test_service_url) == 0); GPR_ASSERT(strcmp(context.method_name, test_method) == 0); GPR_ASSERT(context.channel_auth_context == NULL); GPR_ASSERT(context.reserved == NULL); + GPR_ASSERT(GPR_ARRAY_SIZE(plugin_md) < + GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX); + plugin_state *s = (plugin_state *)state; *s = PLUGIN_GET_METADATA_CALLED_STATE; - for (i = 0; i < GPR_ARRAY_SIZE(plugin_md); i++) { - memset(&md[i], 0, sizeof(grpc_metadata)); - md[i].key = grpc_slice_from_copied_string(plugin_md[i].key); - md[i].value = grpc_slice_from_copied_string(plugin_md[i].value); + for (size_t i = 0; i < GPR_ARRAY_SIZE(plugin_md); ++i) { + memset(&creds_md[i], 0, sizeof(grpc_metadata)); + creds_md[i].key = grpc_slice_from_copied_string(plugin_md[i].key); + creds_md[i].value = grpc_slice_from_copied_string(plugin_md[i].value); } - cb(user_data, md, GPR_ARRAY_SIZE(md), GRPC_STATUS_OK, NULL); + *num_creds_md = GPR_ARRAY_SIZE(plugin_md); + return true; // Synchronous return. } static const char *plugin_error_details = "Could not get metadata for plugin."; -static void plugin_get_metadata_failure(void *state, - grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, - void *user_data) { - plugin_state *s = (plugin_state *)state; +static int plugin_get_metadata_failure( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) { GPR_ASSERT(strcmp(context.service_url, test_service_url) == 0); GPR_ASSERT(strcmp(context.method_name, test_method) == 0); GPR_ASSERT(context.channel_auth_context == NULL); GPR_ASSERT(context.reserved == NULL); + plugin_state *s = (plugin_state *)state; *s = PLUGIN_GET_METADATA_CALLED_STATE; - cb(user_data, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, plugin_error_details); + *status = GRPC_STATUS_UNAUTHENTICATED; + *error_details = gpr_strdup(plugin_error_details); + return true; // Synchronous return. } static void plugin_destroy(void *state) { -- cgit v1.2.3 From c65c8773c0f0d27697c4b260be76167cd212ec92 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 20 Sep 2017 17:19:01 -0700 Subject: Remove the entire line --- include/grpc/impl/codegen/port_platform.h | 1 - 1 file changed, 1 deletion(-) (limited to 'include') diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index 8d8dcee3b0..24581fa6ef 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -183,7 +183,6 @@ #define _BSD_SOURCE #endif #if TARGET_OS_IPHONE -#define GPR_FORBID_UNREACHABLE_CODE 0 #define GPR_PLATFORM_STRING "ios" #define GPR_CPU_IPHONE 1 #define GPR_PTHREAD_TLS 1 -- cgit v1.2.3 From 2046d0b3c50e555f339062b4fba298e8af8b3a1f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 19 Sep 2017 15:30:11 -0700 Subject: Add unary call mode to GenericStub to allow generic RPC with 1 CQ trip --- include/grpc++/generic/generic_stub.h | 10 ++++++ src/cpp/client/generic_stub.cc | 10 ++++++ test/cpp/end2end/generic_end2end_test.cc | 56 ++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+) (limited to 'include') diff --git a/include/grpc++/generic/generic_stub.h b/include/grpc++/generic/generic_stub.h index 2b3ff59ea2..d5064318cf 100644 --- a/include/grpc++/generic/generic_stub.h +++ b/include/grpc++/generic/generic_stub.h @@ -20,6 +20,7 @@ #define GRPCXX_GENERIC_GENERIC_STUB_H #include +#include #include namespace grpc { @@ -27,6 +28,7 @@ namespace grpc { class CompletionQueue; typedef ClientAsyncReaderWriter GenericClientAsyncReaderWriter; +typedef ClientAsyncResponseReader GenericClientAsyncResponseReader; /// Generic stubs provide a type-unsafe interface to call gRPC methods /// by name. @@ -51,6 +53,14 @@ class GenericStub final { std::unique_ptr PrepareCall( ClientContext* context, const grpc::string& method, CompletionQueue* cq); + /// Setup a unary call to a named method \a method using \a context, and don't + /// start it. Let it be started explicitly with StartCall. + /// The return value only indicates whether or not registration of the call + /// succeeded (i.e. the call won't proceed if the return value is nullptr). + std::unique_ptr PrepareUnaryCall( + ClientContext* context, const grpc::string& method, + const ByteBuffer& request, CompletionQueue* cq); + private: std::shared_ptr channel_; }; diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index de2e449fe8..693b8bea56 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -47,4 +47,14 @@ std::unique_ptr GenericStub::PrepareCall( return CallInternal(channel_.get(), context, method, cq, false, nullptr); } +// setup a unary call to a named method +std::unique_ptr GenericStub::PrepareUnaryCall( + ClientContext* context, const grpc::string& method, + const ByteBuffer& request, CompletionQueue* cq) { + return std::unique_ptr( + GenericClientAsyncResponseReader::Create( + channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::NORMAL_RPC), + context, request, false)); +} + } // namespace grpc diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index b9e6e18ca7..33b35108d2 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -196,6 +196,62 @@ TEST_F(GenericEnd2endTest, SequentialRpcs) { SendRpc(10); } +TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) { + ResetStub(); + const int num_rpcs = 10; + const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo"); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + GenericServerContext srv_ctx; + GenericServerAsyncReaderWriter stream(&srv_ctx); + + // The string needs to be long enough to test heap-based slice. + send_request.set_message("Hello world. Hello world. Hello world."); + + std::unique_ptr cli_send_buffer = + SerializeToByteBuffer(&send_request); + std::unique_ptr call = + generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName, + *cli_send_buffer.get(), &cli_cq_); + call->StartCall(); + ByteBuffer cli_recv_buffer; + call->Finish(&cli_recv_buffer, &recv_status, tag(1)); + + generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), + srv_cq_.get(), tag(4)); + + verify_ok(srv_cq_.get(), 4, true); + EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); + EXPECT_EQ(kMethodName, srv_ctx.method()); + + ByteBuffer srv_recv_buffer; + stream.Read(&srv_recv_buffer, tag(5)); + server_ok(5); + EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request)); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + std::unique_ptr srv_send_buffer = + SerializeToByteBuffer(&send_response); + stream.Write(*srv_send_buffer, tag(6)); + server_ok(6); + + stream.Finish(Status::OK, tag(7)); + server_ok(7); + + client_ok(1); + EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response)); + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } +} + // One ping, one pong. TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { ResetStub(); -- cgit v1.2.3 From 60092b03d21f66d721d68af11b8d0915bc81b1ab Mon Sep 17 00:00:00 2001 From: Eric Dobson Date: Sun, 24 Sep 2017 11:09:05 -0700 Subject: Fix docs on grpc_call_ref --- include/grpc/grpc.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index fab7d438aa..1de289fba4 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -313,7 +313,7 @@ GRPCAPI grpc_call_error grpc_call_cancel_with_status(grpc_call *call, void *reserved); /** Ref a call. - THREAD SAFETY: grpc_call_unref is thread-compatible */ + THREAD SAFETY: grpc_call_ref is thread-compatible */ GRPCAPI void grpc_call_ref(grpc_call *call); /** Unref a call. -- cgit v1.2.3 From fe40815682f7da0a7d6f09be2253d6660ef8e7a5 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Wed, 27 Sep 2017 12:27:20 -0700 Subject: Readd grpclb fallback --- include/grpc++/support/channel_arguments.h | 6 + include/grpc/impl/codegen/grpc_types.h | 6 +- .../client_channel/lb_policy/grpclb/grpclb.c | 254 ++++++++++++++++----- .../ext/filters/client_channel/lb_policy_factory.c | 2 +- .../ext/filters/client_channel/lb_policy_factory.h | 2 +- src/cpp/common/channel_arguments.cc | 4 + test/cpp/end2end/grpclb_end2end_test.cc | 180 ++++++++++++++- 7 files changed, 384 insertions(+), 70 deletions(-) (limited to 'include') diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h index 7b6befeaf1..9dc505f008 100644 --- a/include/grpc++/support/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -64,6 +64,12 @@ class ChannelArguments { /// Set the compression algorithm for the channel. void SetCompressionAlgorithm(grpc_compression_algorithm algorithm); + /// Set the grpclb fallback timeout (in ms) for the channel. If this amount + /// of time has passed but we have not gotten any non-empty \a serverlist from + /// the balancer, we will fall back to use the backend address(es) returned by + /// the resolver. + void SetGrpclbFallbackTimeout(int fallback_timeout); + /// Set the socket mutator for the channel. void SetSocketMutator(grpc_socket_mutator* mutator); diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 90f03f49a3..65463bb33b 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -288,7 +288,11 @@ typedef struct { "grpc.experimental.tcp_max_read_chunk_size" /* Timeout in milliseconds to use for calls to the grpclb load balancer. If 0 or unset, the balancer calls will have no deadline. */ -#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_timeout_ms" +#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms" +/* Timeout in milliseconds to wait for the serverlist from the grpclb load + balancer before using fallback backend addresses from the resolver. + If 0, fallback will never be used. */ +#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms" /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ "grpc.workaround.cronet_compression" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 05a106f214..8dc81b46d1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -123,6 +123,7 @@ #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 +#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); @@ -299,6 +300,10 @@ typedef struct glb_lb_policy { /** timeout in milliseconds for the LB call. 0 means no deadline. */ int lb_call_timeout_ms; + /** timeout in milliseconds for before using fallback backend addresses. + * 0 means not using fallback. */ + int lb_fallback_timeout_ms; + /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -325,6 +330,9 @@ typedef struct glb_lb_policy { * Otherwise, we delegate to the RR policy. */ size_t serverlist_index; + /** stores the backend addresses from the resolver */ + grpc_lb_addresses *fallback_backend_addresses; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -345,6 +353,9 @@ typedef struct glb_lb_policy { /** is \a lb_call_retry_timer active? */ bool retry_timer_active; + /** is \a lb_fallback_timer active? */ + bool fallback_timer_active; + /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; @@ -364,6 +375,9 @@ typedef struct glb_lb_policy { /* LB call retry timer callback. */ grpc_closure lb_on_call_retry; + /* LB fallback timer callback. */ + grpc_closure lb_on_fallback; + grpc_call *lb_call; /* streaming call to the LB server, */ grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ @@ -387,6 +401,9 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; + /** LB fallback timer */ + grpc_timer lb_fallback_timer; + bool seen_initial_response; /* Stats for client-side load reporting. Should be unreffed and @@ -532,6 +549,32 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } +/* Returns the backend addresses extracted from the given addresses */ +static grpc_lb_addresses *extract_backend_addresses_locked( + grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) { + /* first pass: count the number of backend addresses */ + size_t num_backends = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) { + ++num_backends; + } + } + /* second pass: actually populate the addresses and (empty) LB tokens */ + grpc_lb_addresses *backend_addresses = + grpc_lb_addresses_create(num_backends, &lb_token_vtable); + size_t num_copied = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) continue; + const grpc_resolved_address *addr = &addresses->addresses[i].address; + grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, + addr->len, false /* is_balancer */, + NULL /* balancer_name */, + (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); + ++num_copied; + } + return backend_addresses; +} + static void update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state rr_state, grpc_error *rr_state_error) { @@ -599,35 +642,38 @@ static bool pick_from_internal_rr_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - // Look at the index into the serverlist to see if we should drop this call. - grpc_grpclb_server *server = - glb_policy->serverlist->servers[glb_policy->serverlist_index++]; - if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { - glb_policy->serverlist_index = 0; // Wrap-around. - } - if (server->drop) { - // Not using the RR policy, so unref it. - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); + // Check for drops if we are not using fallback backend addresses. + if (glb_policy->serverlist != NULL) { + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - // Update client load reporting stats to indicate the number of - // dropped calls. Note that we have to do this here instead of in - // the client_load_reporting filter, because we do not create a - // subchannel call (and therefore no client_load_reporting filter) - // for dropped calls. - grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, - wc_arg->client_stats); - grpc_grpclb_client_stats_unref(wc_arg->client_stats); - if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + if (server->drop) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_dropped_locked( + server->load_balance_token, wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } gpr_free(wc_arg->free_when_done); - return false; + return true; } - gpr_free(wc_arg->free_when_done); - return true; } // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( @@ -665,8 +711,18 @@ static bool pick_from_internal_rr_locked( static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - grpc_lb_addresses *addresses = - process_serverlist_locked(exec_ctx, glb_policy->serverlist); + grpc_lb_addresses *addresses; + if (glb_policy->serverlist != NULL) { + GPR_ASSERT(glb_policy->serverlist->num_servers > 0); + addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist); + } else { + // If rr_handover_locked() is invoked when we haven't received any + // serverlist from the balancer, we use the fallback backends returned by + // the resolver. Note that the fallback backend list may be empty, in which + // case the new round_robin policy will keep the requested picks pending. + GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); + addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses); + } GPR_ASSERT(addresses != NULL); grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args)); args->client_channel_factory = glb_policy->cc_factory; @@ -772,8 +828,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - GPR_ASSERT(glb_policy->serverlist != NULL && - glb_policy->serverlist->num_servers > 0); if (glb_policy->shutting_down) return; grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); GPR_ASSERT(args != NULL); @@ -922,6 +976,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } + if (glb_policy->fallback_backend_addresses != NULL) { + grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); if (glb_policy->pending_update_args != NULL) { @@ -1063,10 +1120,28 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } +static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy); static void start_picking_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { + /* start a timer to fall back */ + if (glb_policy->lb_fallback_timeout_ms > 0 && + glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = gpr_time_add( + now, + gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN)); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, + glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); + glb_policy->fallback_timer_active = true; + grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, + &glb_policy->lb_on_fallback, now); + } + glb_policy->started_picking = true; gpr_backoff_reset(&glb_policy->lb_call_backoff_state); query_for_backends_locked(exec_ctx, glb_policy); @@ -1545,6 +1620,15 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ grpc_grpclb_destroy_serverlist(glb_policy->serverlist); + } else { + /* or dispose of the fallback */ + grpc_lb_addresses_destroy(exec_ctx, + glb_policy->fallback_backend_addresses); + glb_policy->fallback_backend_addresses = NULL; + if (glb_policy->fallback_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); + glb_policy->fallback_timer_active = false; + } } /* and update the copy in the glb_lb_policy instance. This * serverlist instance will be destroyed either upon the next @@ -1555,9 +1639,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, } } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Received empty server list. Picks will stay pending until " - "a response with > 0 servers is received"); + gpr_log(GPR_INFO, "Received empty server list, ignoring."); } grpc_grpclb_destroy_serverlist(serverlist); } @@ -1592,6 +1674,27 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, } } +static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = (glb_lb_policy *)arg; + glb_policy->fallback_timer_active = false; + /* If we receive a serverlist after the timer fires but before this callback + * actually runs, don't fall back. */ + if (glb_policy->serverlist == NULL) { + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "Falling back to use backends from resolver (grpclb %p)", + (void *)glb_policy); + } + GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); + rr_handover_locked(exec_ctx, glb_policy); + } + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "grpclb_fallback_timer"); +} + static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; @@ -1616,31 +1719,22 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, } } +static void fallback_update_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy, + const grpc_lb_addresses *addresses) { + GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); + grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + glb_policy->fallback_backend_addresses = + extract_backend_addresses_locked(exec_ctx, addresses); + if (glb_policy->lb_fallback_timeout_ms > 0 && + !glb_policy->fallback_timer_active) { + rr_handover_locked(exec_ctx, glb_policy); + } +} + static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { glb_lb_policy *glb_policy = (glb_lb_policy *)policy; - if (glb_policy->updating_lb_channel) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Update already in progress for grpclb %p. Deferring update.", - (void *)glb_policy); - } - if (glb_policy->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, - glb_policy->pending_update_args->args); - gpr_free(glb_policy->pending_update_args); - } - glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc( - sizeof(*glb_policy->pending_update_args)); - glb_policy->pending_update_args->client_channel_factory = - args->client_channel_factory; - glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); - glb_policy->pending_update_args->combiner = args->combiner; - return; - } - - glb_policy->updating_lb_channel = true; - // Propagate update to lb_channel (pick first). const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -1658,13 +1752,43 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, "ignoring.", (void *)glb_policy); } + return; } const grpc_lb_addresses *addresses = (const grpc_lb_addresses *)arg->value.pointer.p; + + if (glb_policy->serverlist == NULL) { + // If a non-empty serverlist hasn't been received from the balancer, + // propagate the update to fallback_backend_addresses. + fallback_update_locked(exec_ctx, glb_policy, addresses); + } else if (glb_policy->updating_lb_channel) { + // If we have recieved serverlist from the balancer, we need to defer update + // when there is an in-progress one. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "Update already in progress for grpclb %p. Deferring update.", + (void *)glb_policy); + } + if (glb_policy->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, + glb_policy->pending_update_args->args); + gpr_free(glb_policy->pending_update_args); + } + glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc( + sizeof(*glb_policy->pending_update_args)); + glb_policy->pending_update_args->client_channel_factory = + args->client_channel_factory; + glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); + glb_policy->pending_update_args->combiner = args->combiner; + return; + } + + glb_policy->updating_lb_channel = true; GPR_ASSERT(glb_policy->lb_channel != NULL); grpc_channel_args *lb_channel_args = build_lb_channel_args( exec_ctx, addresses, glb_policy->response_generator, args->args); - /* Propagate updates to the LB channel through the fake resolver */ + /* Propagate updates to the LB channel (pick first) through the fake resolver + */ grpc_fake_resolver_response_generator_set_response( exec_ctx, glb_policy->response_generator, lb_channel_args); grpc_channel_args_destroy(exec_ctx, lb_channel_args); @@ -1767,13 +1891,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - /* Count the number of gRPC-LB addresses. There must be at least one. - * TODO(roth): For now, we ignore non-balancer addresses, but in the - * future, we may change the behavior such that we fall back to using - * the non-balancer addresses if we cannot reach any balancers. In the - * fallback case, we should use the LB policy indicated by - * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is - * unset, we should default to pick_first). */ + /* Count the number of gRPC-LB addresses. There must be at least one. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -1809,6 +1927,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->lb_call_timeout_ms = grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); + glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer( + arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, + INT_MAX}); + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. grpc_arg new_arg = grpc_channel_arg_string_create( @@ -1817,6 +1940,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->args = grpc_channel_args_copy_and_add_and_remove( args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + /* Extract the backend addresses (may be empty) from the resolver for + * fallback. */ + glb_policy->fallback_backend_addresses = + extract_backend_addresses_locked(exec_ctx, addresses); + /* Create a client channel over them to communicate with a LB service */ glb_policy->response_generator = grpc_fake_resolver_response_generator_create(); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c index 4d1405454c..05ab43d0b6 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.c +++ b/src/core/ext/filters/client_channel/lb_policy_factory.c @@ -56,7 +56,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) { } void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - void* address, size_t address_len, + const void* address, size_t address_len, bool is_balancer, const char* balancer_name, void* user_data) { GPR_ASSERT(index < addresses->num_addresses); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index 9d9fb143df..cf0f8cb615 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -73,7 +73,7 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses); * \a address is a socket address of length \a address_len. * Takes ownership of \a balancer_name. */ void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index, - void *address, size_t address_len, + const void *address, size_t address_len, bool is_balancer, const char *balancer_name, void *user_data); diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index f130aecd4b..f89f5f1f03 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -86,6 +86,10 @@ void ChannelArguments::SetCompressionAlgorithm( SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm); } +void ChannelArguments::SetGrpclbFallbackTimeout(int fallback_timeout) { + SetInt(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, fallback_timeout); +} + void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { if (!mutator) { return; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 77ed155292..f73a9c1791 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -368,8 +368,9 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_fake_resolver_response_generator_unref(response_generator_); } - void ResetStub() { + void ResetStub(int fallback_timeout = 0) { ChannelArguments args; + args.SetGrpclbFallbackTimeout(fallback_timeout); args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); std::ostringstream uri; @@ -470,10 +471,10 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_exec_ctx_finish(&exec_ctx); } - const std::vector GetBackendPorts() const { + const std::vector GetBackendPorts(const size_t start_index = 0) const { std::vector backend_ports; - for (const auto& bs : backend_servers_) { - backend_ports.push_back(bs.port_); + for (size_t i = start_index; i < backend_servers_.size(); ++i) { + backend_ports.push_back(backend_servers_[i].port_); } return backend_ports; } @@ -642,6 +643,177 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, Fallback) { + const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); + const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); + const size_t kNumBackendInResolution = backends_.size() / 2; + + ResetStub(kFallbackTimeoutMs); + std::vector addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""}); + } + SetNextResolution(addresses); + + // Send non-empty serverlist only after kServerlistDelayMs. + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumBackendInResolution /* start_index */), {}), + kServerlistDelayMs); + + // Wait until all the fallback backends are reachable. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + WaitForBackend(i); + } + + // The first request. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(kNumBackendInResolution); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + // Fallback is used: each backend returned by the resolver should have + // gotten one request. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + + // Wait until the serverlist reception has been processed and all backends + // in the serverlist are reachable. + for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { + WaitForBackend(i); + } + + // Send out the second request. + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(backends_.size() - kNumBackendInResolution); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + + // Serverlist is used: each backend returned by the balancer should + // have gotten one request. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + } + + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); +} + +TEST_F(SingleBalancerTest, FallbackUpdate) { + const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); + const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); + const size_t kNumBackendInResolution = backends_.size() / 3; + const size_t kNumBackendInResolutionUpdate = backends_.size() / 3; + + ResetStub(kFallbackTimeoutMs); + std::vector addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""}); + } + SetNextResolution(addresses); + + // Send non-empty serverlist only after kServerlistDelayMs. + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumBackendInResolution + + kNumBackendInResolutionUpdate /* start_index */), + {}), + kServerlistDelayMs); + + // Wait until all the fallback backends are reachable. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + WaitForBackend(i); + } + + // The first request. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(kNumBackendInResolution); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + // Fallback is used: each backend returned by the resolver should have + // gotten one request. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + + addresses.clear(); + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + for (size_t i = kNumBackendInResolution; + i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) { + addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""}); + } + SetNextResolution(addresses); + + // Wait until the resolution update has been processed and all the new + // fallback backends are reachable. + for (size_t i = kNumBackendInResolution; + i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) { + WaitForBackend(i); + } + + // Send out the second request. + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(kNumBackendInResolutionUpdate); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + + // The resolution update is used: each backend in the resolution update should + // have gotten one request. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution; + i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) { + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate; + i < backends_.size(); ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + + // Wait until the serverlist reception has been processed and all backends + // in the serverlist are reachable. + for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate; + i < backends_.size(); ++i) { + WaitForBackend(i); + } + + // Send out the third request. + gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); + CheckRpcSendOk(backends_.size() - kNumBackendInResolution - + kNumBackendInResolutionUpdate); + gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); + + // Serverlist is used: each backend returned by the balancer should + // have gotten one request. + for (size_t i = 0; + i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate; + i < backends_.size(); ++i) { + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + } + + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); +} + TEST_F(SingleBalancerTest, BackendsRestart) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( -- cgit v1.2.3 From 61b26f97b04b4fec82d6b9ea110ad7743a0ab178 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 28 Sep 2017 08:40:03 -0700 Subject: Code review changes. --- include/grpc/grpc_security.h | 2 +- src/cpp/client/secure_credentials.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'include') diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h index 42e2ab3111..95b1447935 100644 --- a/include/grpc/grpc_security.h +++ b/include/grpc/grpc_security.h @@ -249,7 +249,7 @@ typedef struct { void *reserved; } grpc_auth_metadata_context; -/** Maximum number of credentials returnable by a credentials plugin via +/** Maximum number of metadata entries returnable by a credentials plugin via a synchronous return. */ #define GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX 4 diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 6c06e45326..e2bd1f7747 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -178,12 +178,12 @@ int MetadataCredentialsPluginWrapper::GetMetadata( w->thread_pool_->Add( std::bind(&MetadataCredentialsPluginWrapper::InvokePlugin, w, context, cb, user_data, nullptr, nullptr, nullptr, nullptr)); - return false; + return 0; } else { // Synchronous return. w->InvokePlugin(context, cb, user_data, creds_md, num_creds_md, status, error_details); - return true; + return 1; } } -- cgit v1.2.3 From 30f1d0fe79d45c8d89a654faf00431366d75ba84 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Thu, 28 Sep 2017 14:07:32 -0700 Subject: Specify min windows version as Vista for Python --- include/grpc/impl/codegen/port_platform.h | 4 ---- setup.py | 4 +--- src/core/lib/iomgr/socket_utils_windows.c | 4 ---- 3 files changed, 1 insertion(+), 11 deletions(-) (limited to 'include') diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index 7e5f85253e..0f4316c954 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -291,10 +291,6 @@ #endif #ifdef _MSC_VER -#ifdef _PYTHON_MSVC -// The Python 3.5 Windows runtime is missing InetNtop -#define GPR_WIN_INET_NTOP -#endif // _PYTHON_MSVC #if _MSC_VER < 1700 typedef __int8 int8_t; typedef __int16 int16_t; diff --git a/setup.py b/setup.py index 12882413ce..90c9316b0d 100644 --- a/setup.py +++ b/setup.py @@ -110,8 +110,6 @@ if EXTRA_ENV_COMPILE_ARGS is None: EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime32 -D_timeb=__timeb32 -D_ftime_s=_ftime32_s' else: EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime64 -D_timeb=__timeb64' - elif 'win32' in sys.platform: - EXTRA_ENV_COMPILE_ARGS += ' -D_PYTHON_MSVC' elif "linux" in sys.platform: EXTRA_ENV_COMPILE_ARGS += ' -std=c++11 -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions' elif "darwin" in sys.platform: @@ -163,7 +161,7 @@ if "win32" in sys.platform: # TODO(zyc): Re-enble c-ares on x64 and x86 windows after fixing the # ares_library_init compilation issue DEFINE_MACROS += (('WIN32_LEAN_AND_MEAN', 1), ('CARES_STATICLIB', 1), - ('GRPC_ARES', 0),) + ('GRPC_ARES', 0), ('NTDDI_VERSION', 0x06000000),) if '64bit' in platform.architecture()[0]: DEFINE_MACROS += (('MS_WIN64', 1),) elif sys.version_info >= (3, 5): diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c index 2732c159aa..6e85e4b61f 100644 --- a/src/core/lib/iomgr/socket_utils_windows.c +++ b/src/core/lib/iomgr/socket_utils_windows.c @@ -26,12 +26,8 @@ #include const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) { -#ifdef GPR_WIN_INET_NTOP - return inet_ntop(af, src, dst, size); -#else /* Windows InetNtopA wants a mutable ip pointer */ return InetNtopA(af, (void *)src, dst, size); -#endif /* GPR_WIN_INET_NTOP */ } #endif /* GRPC_WINDOWS_SOCKETUTILS */ -- cgit v1.2.3