diff options
author | David Garcia Quintas <dgq@google.com> | 2016-05-18 10:53:14 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-05-18 10:53:14 -0700 |
commit | f1945f2a67575d3e3a5d9f59862de9f412cc776f (patch) | |
tree | 0e3c1f077a5ceee1df539ee69360b4e4f7436b5c | |
parent | 3e71f774c8c004d97fea6d48dbb89341c71195ad (diff) | |
parent | fcbe7daf832dcb616fc93ca59c3b1aab279f510e (diff) |
Merge branch 'master' of github.com:grpc/grpc into compression_incoming_checks
30 files changed, 348 insertions, 145 deletions
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index d081b7d9c5..d457f03fa6 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -329,8 +329,11 @@ class CallOpGenericRecvMessage { template <class R> void RecvMessage(R* message) { - deserialize_.reset( - new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message)); + // Use an explicit base class pointer to avoid resolution error in the + // following unique_ptr::reset for some old implementations. + CallOpGenericRecvMessageHelper::DeserializeFunc* func = + new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message); + deserialize_.reset(func); } bool got_message; diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 5510c79b18..0e548c61b8 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -47,7 +47,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" -int grpc_compress_filter_trace = 0; +int grpc_compression_trace = 0; typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ @@ -171,7 +171,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, did_compress = grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; @@ -185,12 +185,14 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.", - algo_name); + gpr_log( + GPR_DEBUG, + "Algorithm '%s' enabled but decided not to compress. Input size: %d", + algo_name, calld->slices.length); } } diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h index cf5879d82e..0ce5d08837 100644 --- a/src/core/lib/channel/compress_filter.h +++ b/src/core/lib/channel/compress_filter.h @@ -38,7 +38,7 @@ #define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" -extern int grpc_compress_filter_trace; +extern int grpc_compression_trace; /** Compression filter for outgoing data. * diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c index 809fd5f1fa..c97079f638 100644 --- a/src/core/lib/surface/byte_buffer_reader.c +++ b/src/core/lib/surface/byte_buffer_reader.c @@ -62,12 +62,19 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, case GRPC_BB_RAW: gpr_slice_buffer_init(&decompressed_slices_buffer); if (is_compressed(reader->buffer_in)) { - grpc_msg_decompress(reader->buffer_in->data.raw.compression, - &reader->buffer_in->data.raw.slice_buffer, - &decompressed_slices_buffer); - reader->buffer_out = - grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, - decompressed_slices_buffer.count); + if (grpc_msg_decompress(reader->buffer_in->data.raw.compression, + &reader->buffer_in->data.raw.slice_buffer, + &decompressed_slices_buffer) == 0) { + gpr_log(GPR_ERROR, + "Unexpected error decompressing data for algorithm with enum " + "value '%d'. Reading data as if it were uncompressed.", + reader->buffer_in->data.raw.compression); + reader->buffer_out = reader->buffer_in; + } else { /* all fine */ + reader->buffer_out = + grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); + } gpr_slice_buffer_destroy(&decompressed_slices_buffer); } else { /* not compressed, use the input buffer as output */ reader->buffer_out = reader->buffer_in; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 99bb4798aa..b5469e1ff9 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -267,6 +267,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, call->channel = channel; call->cq = cq; call->parent = parent_call; + /* Always support no compression */ + GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = server_transport_data == NULL; if (call->is_client) { GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); @@ -414,6 +416,7 @@ static void set_status_code(grpc_call *call, status_source source, static void set_compression_algorithm(grpc_call *call, grpc_compression_algorithm algo) { + GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); call->compression_algorithm = algo; } @@ -888,12 +891,16 @@ static uint32_t decode_status(grpc_mdelem *md) { return status; } -static uint32_t decode_compression(grpc_mdelem *md) { +static grpc_compression_algorithm decode_compression(grpc_mdelem *md) { grpc_compression_algorithm algorithm = grpc_compression_algorithm_from_mdstr(md->value); if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + gpr_log(GPR_ERROR, + "Invalid incoming compression algorithm: '%s'. Interpreting " + "incoming data as uncompressed.", + md_c_str); + return GRPC_COMPRESS_NONE; } return algorithm; } @@ -1136,6 +1143,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; + /* validate call->compression_algorithm */ if (call->compression_algorithm != GRPC_COMPRESS_NONE) { const grpc_compression_algorithm algo = call->compression_algorithm; char *error_msg = NULL; @@ -1161,6 +1169,23 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } gpr_free(error_msg); } + + /* make sure the received grpc-encoding is amongst the ones listed in + * grpc-accept-encoding */ + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->compression_algorithm)) { + extern int grpc_compression_trace; + if (grpc_compression_trace) { + char *algo_name; + grpc_compression_algorithm_name(call->compression_algorithm, &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } + } } static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, @@ -1568,7 +1593,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_call_error err; GRPC_API_TRACE( - "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", + "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, " + "reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); if (reserved != NULL) { diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 57c6897626..1c8b709015 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -164,7 +164,7 @@ void grpc_init(void) { grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("compression", &grpc_compress_filter_trace); + grpc_register_tracer("compression", &grpc_compression_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_executor_init(); diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index a2c1c08169..884130e7d4 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -89,7 +89,7 @@ zend_object_value create_wrapped_grpc_call(zend_class_entry *class_type /* Wraps a grpc_call struct in a PHP object. Owned indicates whether the struct should be destroyed at the end of the object's lifecycle */ -zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) { +zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC) { zval *call_object; MAKE_STD_ZVAL(call_object); object_init_ex(call_object, grpc_ce_call); @@ -102,7 +102,7 @@ zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) { /* Creates and returns a PHP array object with the data in a * grpc_metadata_array. Returns NULL on failure */ -zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) { +zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array TSRMLS_DC) { int count = metadata_array->count; grpc_metadata *elements = metadata_array->metadata; int i; @@ -127,7 +127,7 @@ zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) { if (zend_hash_find(array_hash, str_key, key_len, (void **)data) == SUCCESS) { if (Z_TYPE_P(*data) != IS_ARRAY) { - zend_throw_exception(zend_exception_get_default(), + zend_throw_exception(zend_exception_get_default(TSRMLS_C), "Metadata hash somehow contains wrong types.", 1 TSRMLS_CC); efree(str_key); @@ -454,7 +454,7 @@ PHP_METHOD(Call, startBatch) { add_property_bool(result, "send_status", true); break; case GRPC_OP_RECV_INITIAL_METADATA: - array = grpc_parse_metadata_array(&recv_metadata); + array = grpc_parse_metadata_array(&recv_metadata TSRMLS_CC); add_property_zval(result, "metadata", array); Z_DELREF_P(array); break; @@ -470,7 +470,7 @@ PHP_METHOD(Call, startBatch) { case GRPC_OP_RECV_STATUS_ON_CLIENT: MAKE_STD_ZVAL(recv_status); object_init(recv_status); - array = grpc_parse_metadata_array(&recv_trailing_metadata); + array = grpc_parse_metadata_array(&recv_trailing_metadata TSRMLS_CC); add_property_zval(recv_status, "metadata", array); Z_DELREF_P(array); add_property_long(recv_status, "code", status); diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h index 73efadae35..36c5f2d272 100644 --- a/src/php/ext/grpc/call.h +++ b/src/php/ext/grpc/call.h @@ -60,11 +60,11 @@ typedef struct wrapped_grpc_call { void grpc_init_call(TSRMLS_D); /* Creates a Call object that wraps the given grpc_call struct */ -zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned); +zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC); /* Creates and returns a PHP associative array of metadata from a C array of * call metadata */ -zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array); +zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array TSRMLS_DC); /* Populates a grpc_metadata_array with the data in a PHP array object. Returns true on success and false on failure */ diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index 285c4e7c85..ec0e6b9181 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -83,7 +83,7 @@ zend_object_value create_wrapped_grpc_call_credentials( return retval; } -zval *grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped) { +zval *grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped TSRMLS_DC) { zval *credentials_object; MAKE_STD_ZVAL(credentials_object); object_init_ex(credentials_object, grpc_ce_call_credentials); @@ -122,7 +122,7 @@ PHP_METHOD(CallCredentials, createComposite) { grpc_call_credentials *creds = grpc_composite_call_credentials_create(cred1->wrapped, cred2->wrapped, NULL); - zval *creds_object = grpc_php_wrap_call_credentials(creds); + zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -141,7 +141,7 @@ PHP_METHOD(CallCredentials, createFromPlugin) { memset(fci_cache, 0, sizeof(zend_fcall_info_cache)); /* "f" == 1 function */ - if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", fci, + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", fci, fci_cache, fci->params, fci->param_count) == FAILURE) { @@ -167,7 +167,7 @@ PHP_METHOD(CallCredentials, createFromPlugin) { grpc_call_credentials *creds = grpc_metadata_credentials_create_from_plugin( plugin, NULL); - zval *creds_object = grpc_php_wrap_call_credentials(creds); + zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -175,6 +175,8 @@ PHP_METHOD(CallCredentials, createFromPlugin) { void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, grpc_credentials_plugin_metadata_cb cb, void *user_data) { + TSRMLS_FETCH(); + plugin_state *state = (plugin_state *)ptr; /* prepare to call the user callback function with info from the @@ -192,7 +194,7 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, state->fci->retval_ptr_ptr = &retval; /* call the user callback function */ - zend_call_function(state->fci, state->fci_cache); + zend_call_function(state->fci, state->fci_cache TSRMLS_CC); if (Z_TYPE_P(retval) != IS_ARRAY) { zend_throw_exception(spl_ce_InvalidArgumentException, diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c index eba2c81424..9f0431908f 100644 --- a/src/php/ext/grpc/channel.c +++ b/src/php/ext/grpc/channel.c @@ -84,7 +84,7 @@ zend_object_value create_wrapped_grpc_channel(zend_class_entry *class_type return retval; } -void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args) { +void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC) { HashTable *array_hash; HashPosition array_pointer; int args_index; @@ -168,7 +168,7 @@ PHP_METHOD(Channel, __construct) { zend_hash_del(array_hash, "credentials", 12); } } - php_grpc_read_args_array(args_array, &args); + php_grpc_read_args_array(args_array, &args TSRMLS_CC); if (creds == NULL) { channel->wrapped = grpc_insecure_channel_create(target, &args, NULL); } else { diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h index 78a16ed0c9..cc5823ee7f 100755 --- a/src/php/ext/grpc/channel.h +++ b/src/php/ext/grpc/channel.h @@ -59,6 +59,6 @@ typedef struct wrapped_grpc_channel { void grpc_init_channel(TSRMLS_D); /* Iterates through a PHP array and populates args with the contents */ -void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args); +void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC); #endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */ diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c index ae9a9897fc..5c537378a6 100644 --- a/src/php/ext/grpc/channel_credentials.c +++ b/src/php/ext/grpc/channel_credentials.c @@ -82,7 +82,7 @@ zend_object_value create_wrapped_grpc_channel_credentials( return retval; } -zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped) { +zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped TSRMLS_DC) { zval *credentials_object; MAKE_STD_ZVAL(credentials_object); object_init_ex(credentials_object, grpc_ce_channel_credentials); @@ -99,7 +99,7 @@ zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped) { */ PHP_METHOD(ChannelCredentials, createDefault) { grpc_channel_credentials *creds = grpc_google_default_credentials_create(); - zval *creds_object = grpc_php_wrap_channel_credentials(creds); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -134,7 +134,7 @@ PHP_METHOD(ChannelCredentials, createSsl) { grpc_channel_credentials *creds = grpc_ssl_credentials_create( pem_root_certs, pem_key_cert_pair.private_key == NULL ? NULL : &pem_key_cert_pair, NULL); - zval *creds_object = grpc_php_wrap_channel_credentials(creds); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -165,7 +165,7 @@ PHP_METHOD(ChannelCredentials, createComposite) { grpc_channel_credentials *creds = grpc_composite_channel_credentials_create(cred1->wrapped, cred2->wrapped, NULL); - zval *creds_object = grpc_php_wrap_channel_credentials(creds); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c index ca129e76ca..6df2e4f978 100644 --- a/src/php/ext/grpc/server.c +++ b/src/php/ext/grpc/server.c @@ -111,7 +111,7 @@ PHP_METHOD(Server, __construct) { if (args_array == NULL) { server->wrapped = grpc_server_create(NULL, NULL); } else { - php_grpc_read_args_array(args_array, &args); + php_grpc_read_args_array(args_array, &args TSRMLS_CC); server->wrapped = grpc_server_create(&args, NULL); efree(args.args); } @@ -154,12 +154,12 @@ PHP_METHOD(Server, requestCall) { 1 TSRMLS_CC); goto cleanup; } - add_property_zval(result, "call", grpc_php_wrap_call(call, true)); + add_property_zval(result, "call", grpc_php_wrap_call(call, true TSRMLS_CC)); add_property_string(result, "method", details.method, true); add_property_string(result, "host", details.host, true); add_property_zval(result, "absolute_deadline", - grpc_php_wrap_timeval(details.deadline)); - add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata)); + grpc_php_wrap_timeval(details.deadline TSRMLS_CC)); + add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata TSRMLS_CC)); cleanup: grpc_call_details_destroy(&details); grpc_metadata_array_destroy(&metadata); diff --git a/src/php/ext/grpc/server_credentials.c b/src/php/ext/grpc/server_credentials.c index f3951b31fe..505da10a28 100644 --- a/src/php/ext/grpc/server_credentials.c +++ b/src/php/ext/grpc/server_credentials.c @@ -81,7 +81,7 @@ zend_object_value create_wrapped_grpc_server_credentials( return retval; } -zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped) { +zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped TSRMLS_DC) { zval *server_credentials_object; MAKE_STD_ZVAL(server_credentials_object); object_init_ex(server_credentials_object, grpc_ce_server_credentials); @@ -120,7 +120,7 @@ PHP_METHOD(ServerCredentials, createSsl) { grpc_server_credentials *creds = grpc_ssl_server_credentials_create_ex( pem_root_certs, &pem_key_cert_pair, 1, GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, NULL); - zval *creds_object = grpc_php_wrap_server_credentials(creds); + zval *creds_object = grpc_php_wrap_server_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/timeval.c b/src/php/ext/grpc/timeval.c index 4fd069e19a..5e242162a8 100644 --- a/src/php/ext/grpc/timeval.c +++ b/src/php/ext/grpc/timeval.c @@ -72,7 +72,7 @@ zend_object_value create_wrapped_grpc_timeval(zend_class_entry *class_type return retval; } -zval *grpc_php_wrap_timeval(gpr_timespec wrapped) { +zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC) { zval *timeval_object; MAKE_STD_ZVAL(timeval_object); object_init_ex(timeval_object, grpc_ce_timeval); @@ -122,7 +122,7 @@ PHP_METHOD(Timeval, add) { wrapped_grpc_timeval *other = (wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC); zval *sum = - grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped)); + grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped) TSRMLS_CC); RETURN_DESTROY_ZVAL(sum); } @@ -146,7 +146,7 @@ PHP_METHOD(Timeval, subtract) { wrapped_grpc_timeval *other = (wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC); zval *diff = - grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped)); + grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped) TSRMLS_CC); RETURN_DESTROY_ZVAL(diff); } @@ -208,7 +208,7 @@ PHP_METHOD(Timeval, similar) { * @return Timeval The current time */ PHP_METHOD(Timeval, now) { - zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME)); + zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(now); } @@ -218,7 +218,7 @@ PHP_METHOD(Timeval, now) { */ PHP_METHOD(Timeval, zero) { zval *grpc_php_timeval_zero = - grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME)); + grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_ZVAL(grpc_php_timeval_zero, false, /* Copy original before returning? */ true /* Destroy original before returning */); @@ -230,7 +230,7 @@ PHP_METHOD(Timeval, zero) { */ PHP_METHOD(Timeval, infFuture) { zval *grpc_php_timeval_inf_future = - grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_future); } @@ -240,7 +240,7 @@ PHP_METHOD(Timeval, infFuture) { */ PHP_METHOD(Timeval, infPast) { zval *grpc_php_timeval_inf_past = - grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME)); + grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_past); } diff --git a/src/php/ext/grpc/timeval.h b/src/php/ext/grpc/timeval.h index 07cef037cb..7456eb6d58 100755 --- a/src/php/ext/grpc/timeval.h +++ b/src/php/ext/grpc/timeval.h @@ -63,6 +63,6 @@ void grpc_init_timeval(TSRMLS_D); void grpc_shutdown_timeval(TSRMLS_D); /* Creates a Timeval object that wraps the given timeval struct */ -zval *grpc_php_wrap_timeval(gpr_timespec wrapped); +zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC); #endif /* NET_GRPC_PHP_GRPC_TIMEVAL_H_ */ diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py index b13d8dd9dd..00788bd4cf 100644 --- a/src/python/grpcio/grpc/_adapter/_low.py +++ b/src/python/grpcio/grpc/_adapter/_low.py @@ -195,26 +195,30 @@ class Call(_types.Call): translated_op = cygrpc.operation_send_initial_metadata( cygrpc.Metadata( cygrpc.Metadatum(key, value) - for key, value in op.initial_metadata)) + for key, value in op.initial_metadata), + op.flags) elif op.type == _types.OpType.SEND_MESSAGE: - translated_op = cygrpc.operation_send_message(op.message) + translated_op = cygrpc.operation_send_message(op.message, op.flags) elif op.type == _types.OpType.SEND_CLOSE_FROM_CLIENT: - translated_op = cygrpc.operation_send_close_from_client() + translated_op = cygrpc.operation_send_close_from_client(op.flags) elif op.type == _types.OpType.SEND_STATUS_FROM_SERVER: translated_op = cygrpc.operation_send_status_from_server( cygrpc.Metadata( cygrpc.Metadatum(key, value) for key, value in op.trailing_metadata), op.status.code, - op.status.details) + op.status.details, + op.flags) elif op.type == _types.OpType.RECV_INITIAL_METADATA: - translated_op = cygrpc.operation_receive_initial_metadata() + translated_op = cygrpc.operation_receive_initial_metadata( + op.flags) elif op.type == _types.OpType.RECV_MESSAGE: - translated_op = cygrpc.operation_receive_message() + translated_op = cygrpc.operation_receive_message(op.flags) elif op.type == _types.OpType.RECV_STATUS_ON_CLIENT: - translated_op = cygrpc.operation_receive_status_on_client() + translated_op = cygrpc.operation_receive_status_on_client( + op.flags) elif op.type == _types.OpType.RECV_CLOSE_ON_SERVER: - translated_op = cygrpc.operation_receive_close_on_server() + translated_op = cygrpc.operation_receive_close_on_server(op.flags) else: raise ValueError('unexpected operation type {}'.format(op.type)) translated_ops.append(translated_op) diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py index 8ca7ff4b60..f8405949d4 100644 --- a/src/python/grpcio/grpc/_adapter/_types.py +++ b/src/python/grpcio/grpc/_adapter/_types.py @@ -152,7 +152,7 @@ class OpArgs(collections.namedtuple( 'trailing_metadata', 'message', 'status', - 'write_flags', + 'flags', ])): """Arguments passed into a GRPC operation. @@ -165,7 +165,7 @@ class OpArgs(collections.namedtuple( message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None. status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else is None. - write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values. + flags (int): a bitwise OR'ing of 0 or more OpWriteFlags values. """ @staticmethod diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 3d158a7707..66e6e6b549 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -140,6 +140,9 @@ cdef extern from "grpc/_cython/loader.h": const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG + const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM + const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL + const char *GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET const int GRPC_WRITE_BUFFER_HINT const int GRPC_WRITE_NO_COMPRESS @@ -425,3 +428,38 @@ cdef extern from "grpc/_cython/loader.h": grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( grpc_metadata_credentials_plugin plugin, void *reserved) nogil + + ctypedef enum grpc_compression_algorithm: + GRPC_COMPRESS_NONE + GRPC_COMPRESS_DEFLATE + GRPC_COMPRESS_GZIP + GRPC_COMPRESS_ALGORITHMS_COUNT + + ctypedef enum grpc_compression_level: + GRPC_COMPRESS_LEVEL_NONE + GRPC_COMPRESS_LEVEL_LOW + GRPC_COMPRESS_LEVEL_MED + GRPC_COMPRESS_LEVEL_HIGH + GRPC_COMPRESS_LEVEL_COUNT + + ctypedef struct grpc_compression_options: + uint32_t enabled_algorithms_bitset + grpc_compression_algorithm default_compression_algorithm + + int grpc_compression_algorithm_parse( + const char *name, size_t name_length, + grpc_compression_algorithm *algorithm) nogil + int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, + char **name) nogil + grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level, uint32_t accepted_encodings) nogil + void grpc_compression_options_init(grpc_compression_options *opts) nogil + void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 30397818a1..0474697af8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -124,3 +124,7 @@ cdef class Operations: cdef size_t c_nops cdef list operations + +cdef class CompressionOptions: + + cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index c2202bdab2..c7539f0d49 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -103,6 +103,19 @@ class OperationType: receive_close_on_server = GRPC_OP_RECV_CLOSE_ON_SERVER +class CompressionAlgorithm: + none = GRPC_COMPRESS_NONE + deflate = GRPC_COMPRESS_DEFLATE + gzip = GRPC_COMPRESS_GZIP + + +class CompressionLevel: + none = GRPC_COMPRESS_LEVEL_NONE + low = GRPC_COMPRESS_LEVEL_LOW + medium = GRPC_COMPRESS_LEVEL_MED + high = GRPC_COMPRESS_LEVEL_HIGH + + cdef class Timespec: def __cinit__(self, time): @@ -473,6 +486,10 @@ cdef class Operation: return self.c_op.type @property + def flags(self): + return self.c_op.flags + + @property def has_status(self): return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT @@ -553,9 +570,10 @@ cdef class Operation: with nogil: gpr_free(self._received_status_details) -def operation_send_initial_metadata(Metadata metadata): +def operation_send_initial_metadata(Metadata metadata, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA + op.c_op.flags = flags op.c_op.data.send_initial_metadata.count = metadata.c_metadata_array.count op.c_op.data.send_initial_metadata.metadata = ( metadata.c_metadata_array.metadata) @@ -563,23 +581,25 @@ def operation_send_initial_metadata(Metadata metadata): op.is_valid = True return op -def operation_send_message(data): +def operation_send_message(data, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_MESSAGE + op.c_op.flags = flags byte_buffer = ByteBuffer(data) op.c_op.data.send_message = byte_buffer.c_byte_buffer op.references.append(byte_buffer) op.is_valid = True return op -def operation_send_close_from_client(): +def operation_send_close_from_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT + op.c_op.flags = flags op.is_valid = True return op def operation_send_status_from_server( - Metadata metadata, grpc_status_code code, details): + Metadata metadata, grpc_status_code code, details, int flags): if isinstance(details, bytes): pass elif isinstance(details, basestring): @@ -588,6 +608,7 @@ def operation_send_status_from_server( raise TypeError("expected a str or bytes object for details") cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER + op.c_op.flags = flags op.c_op.data.send_status_from_server.trailing_metadata_count = ( metadata.c_metadata_array.count) op.c_op.data.send_status_from_server.trailing_metadata = ( @@ -599,18 +620,20 @@ def operation_send_status_from_server( op.is_valid = True return op -def operation_receive_initial_metadata(): +def operation_receive_initial_metadata(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA + op.c_op.flags = flags op._received_metadata = Metadata([]) op.c_op.data.receive_initial_metadata = ( &op._received_metadata.c_metadata_array) op.is_valid = True return op -def operation_receive_message(): +def operation_receive_message(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_MESSAGE + op.c_op.flags = flags op._received_message = ByteBuffer(None) # n.b. the c_op.data.receive_message field needs to be deleted by us, # anyway, so we just let that be handled by the ByteBuffer() we allocated @@ -619,9 +642,10 @@ def operation_receive_message(): op.is_valid = True return op -def operation_receive_status_on_client(): +def operation_receive_status_on_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT + op.c_op.flags = flags op._received_metadata = Metadata([]) op.c_op.data.receive_status_on_client.trailing_metadata = ( &op._received_metadata.c_metadata_array) @@ -634,9 +658,10 @@ def operation_receive_status_on_client(): op.is_valid = True return op -def operation_receive_close_on_server(): +def operation_receive_close_on_server(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER + op.c_op.flags = flags op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled op.is_valid = True return op @@ -692,3 +717,36 @@ cdef class Operations: def __iter__(self): return _OperationsIterator(self) + +cdef class CompressionOptions: + + def __cinit__(self): + with nogil: + grpc_compression_options_init(&self.c_options) + + def enable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_enable_algorithm(&self.c_options, algorithm) + + def disable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_disable_algorithm(&self.c_options, algorithm) + + def is_algorithm_enabled(self, grpc_compression_algorithm algorithm): + cdef int result + with nogil: + result = grpc_compression_options_is_algorithm_enabled( + &self.c_options, algorithm) + return result + + def to_channel_arg(self): + return ChannelArg(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, + self.c_options.enabled_algorithms_bitset) + + +def compression_algorithm_name(grpc_compression_algorithm algorithm): + cdef char* name + with nogil: + grpc_compression_algorithm_name(algorithm, &name) + # Let Cython do the right thing with string casting + return name diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index 54c8aaad13..6de295414a 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -870,14 +870,15 @@ void pygrpc_load_imports(HMODULE library); #else /* !GPR_WIN32 */ -#include <grpc/support/alloc.h> -#include <grpc/support/slice.h> -#include <grpc/support/time.h> -#include <grpc/status.h> #include <grpc/byte_buffer.h> #include <grpc/byte_buffer_reader.h> +#include <grpc/compression.h> #include <grpc/grpc.h> #include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> +#include <grpc/support/slice.h> +#include <grpc/support/time.h> +#include <grpc/status.h> #endif /* !GPR_WIN32 */ diff --git a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py index 876da88de9..0a511101f0 100644 --- a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py @@ -40,6 +40,7 @@ from tests.unit import resources _SSL_HOST_OVERRIDE = 'foo.test.google.fr' _CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key' _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' +_EMPTY_FLAGS = 0 def _metadata_plugin_callback(context, callback): callback(cygrpc.Metadata( @@ -76,7 +77,7 @@ class TypeSmokeTest(unittest.TestCase): def testOperationsIteration(self): operations = cygrpc.Operations([ - cygrpc.operation_send_message('asdf')]) + cygrpc.operation_send_message('asdf', _EMPTY_FLAGS)]) iterator = iter(operations) operation = next(iterator) self.assertIsInstance(operation, cygrpc.Operation) @@ -85,6 +86,11 @@ class TypeSmokeTest(unittest.TestCase): with self.assertRaises(StopIteration): next(iterator) + def testOperationFlags(self): + operation = cygrpc.operation_send_message('asdf', + cygrpc.WriteFlag.no_compress) + self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags) + def testTimespec(self): now = time.time() timespec = cygrpc.Timespec(now) @@ -188,12 +194,13 @@ class InsecureServerInsecureClient(unittest.TestCase): CLIENT_METADATA_ASCII_VALUE), cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]) client_start_batch_result = client_call.start_batch(cygrpc.Operations([ - cygrpc.operation_send_initial_metadata(client_initial_metadata), - cygrpc.operation_send_message(REQUEST), - cygrpc.operation_send_close_from_client(), - cygrpc.operation_receive_initial_metadata(), - cygrpc.operation_receive_message(), - cygrpc.operation_receive_status_on_client() + cygrpc.operation_send_initial_metadata(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) ]), client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -223,12 +230,14 @@ class InsecureServerInsecureClient(unittest.TestCase): cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]) server_start_batch_result = server_call.start_batch([ - cygrpc.operation_send_initial_metadata(server_initial_metadata), - cygrpc.operation_receive_message(), - cygrpc.operation_send_message(RESPONSE), - cygrpc.operation_receive_close_on_server(), + cygrpc.operation_send_initial_metadata(server_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), + cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), cygrpc.operation_send_status_from_server( - server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) + server_trailing_metadata, SERVER_STATUS_CODE, + SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) @@ -349,12 +358,13 @@ class SecureServerSecureClient(unittest.TestCase): CLIENT_METADATA_ASCII_VALUE), cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]) client_start_batch_result = client_call.start_batch(cygrpc.Operations([ - cygrpc.operation_send_initial_metadata(client_initial_metadata), - cygrpc.operation_send_message(REQUEST), - cygrpc.operation_send_close_from_client(), - cygrpc.operation_receive_initial_metadata(), - cygrpc.operation_receive_message(), - cygrpc.operation_receive_status_on_client() + cygrpc.operation_send_initial_metadata(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) ]), client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -387,12 +397,14 @@ class SecureServerSecureClient(unittest.TestCase): cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]) server_start_batch_result = server_call.start_batch([ - cygrpc.operation_send_initial_metadata(server_initial_metadata), - cygrpc.operation_receive_message(), - cygrpc.operation_send_message(RESPONSE), - cygrpc.operation_receive_close_on_server(), + cygrpc.operation_send_initial_metadata(server_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), + cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), cygrpc.operation_send_status_from_server( - server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) + server_trailing_metadata, SERVER_STATUS_CODE, + SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) diff --git a/templates/src/python/grpcio/grpc/_cython/imports.generated.h.template b/templates/src/python/grpcio/grpc/_cython/imports.generated.h.template index 8e7c183180..26e717e58d 100644 --- a/templates/src/python/grpcio/grpc/_cython/imports.generated.h.template +++ b/templates/src/python/grpcio/grpc/_cython/imports.generated.h.template @@ -64,14 +64,15 @@ #else /* !GPR_WIN32 */ - #include <grpc/support/alloc.h> - #include <grpc/support/slice.h> - #include <grpc/support/time.h> - #include <grpc/status.h> #include <grpc/byte_buffer.h> #include <grpc/byte_buffer_reader.h> + #include <grpc/compression.h> #include <grpc/grpc.h> #include <grpc/grpc_security.h> + #include <grpc/support/alloc.h> + #include <grpc/support/slice.h> + #include <grpc/support/time.h> + #include <grpc/status.h> #endif /* !GPR_WIN32 */ diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index f13960156f..265e0acee1 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -89,6 +89,7 @@ static int free_chosen_port(int port) { grpc_free_port_using_server(env, port); } } + gpr_free(env); return found; } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index cba52b111f..189e4a8aab 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -60,7 +60,7 @@ static const char* kRandomFile = "test/cpp/interop/rnd.dat"; namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; -const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979}; +const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979}; const int kNumResponseMessages = 2000; const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; diff --git a/tools/distrib/check_include_guards.py b/tools/distrib/check_include_guards.py index 9c23a70e00..ef770b30b4 100755 --- a/tools/distrib/check_include_guards.py +++ b/tools/distrib/check_include_guards.py @@ -98,6 +98,7 @@ class GuardValidator(object): match = self.ifndef_re.search(fcontents) if not match: print 'something drastically wrong with: %s' % fpath + return False # failed if match.lastindex is None: # No ifndef. Request manual addition with hints self.fail(fpath, match.re, match.string, '', '', False) diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 4fe66dff41..77b158f27e 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -69,6 +69,10 @@ DEEP=100 # wide is the number of client channels in multi-channel tests (1 otherwise) WIDE=64 +# For most synchronous clients, DEEP*WIDE threads will be created. +SYNC_DEEP=10 +SYNC_WIDE=8 + def _get_secargs(is_secure): if is_secure: @@ -81,6 +85,7 @@ def remove_nonproto_fields(scenario): """Remove special-purpose that contains some extra info about the scenario but don't belong to the ScenarioConfig protobuf message""" scenario.pop('CATEGORIES', None) + scenario.pop('CLIENT_LANGUAGE', None) scenario.pop('SERVER_LANGUAGE', None) return scenario @@ -89,7 +94,8 @@ def _ping_pong_scenario(name, rpc_type, client_type, server_type, secure=True, use_generic_payload=False, - use_unconstrained_client=False, + unconstrained_client=None, + client_language=None, server_language=None, server_core_limit=0, async_server_threads=0, @@ -130,18 +136,28 @@ def _ping_pong_scenario(name, rpc_type, # For proto payload, only the client should get the config. scenario['client_config']['payload_config'] = EMPTY_PROTO_PAYLOAD - if use_unconstrained_client: + if unconstrained_client: + if unconstrained_client == 'async': + deep = DEEP + wide = WIDE + elif unconstrained_client == 'sync': + deep = SYNC_DEEP + wide = SYNC_WIDE + else: + raise Exception('Illegal value of unconstrained_client option.') + scenario['num_clients'] = 0 # use as many client as available. - # TODO(jtattermusch): for SYNC_CLIENT, this will create 100*64 threads - # and that's probably too much (at least for wrapped languages). - scenario['client_config']['outstanding_rpcs_per_channel'] = DEEP - scenario['client_config']['client_channels'] = WIDE + scenario['client_config']['outstanding_rpcs_per_channel'] = deep + scenario['client_config']['client_channels'] = wide scenario['client_config']['async_client_threads'] = 0 else: scenario['client_config']['outstanding_rpcs_per_channel'] = 1 scenario['client_config']['client_channels'] = 1 scenario['client_config']['async_client_threads'] = 1 + if client_language: + # the CLIENT_LANGUAGE field is recognized by run_performance_tests.py + scenario['CLIENT_LANGUAGE'] = client_language if server_language: # the SERVER_LANGUAGE field is recognized by run_performance_tests.py scenario['SERVER_LANGUAGE'] = server_language @@ -196,27 +212,27 @@ class CXXLanguage: yield _ping_pong_scenario( 'cpp_protobuf_async_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', secure=secure, categories=smoketest_categories) yield _ping_pong_scenario( 'cpp_protobuf_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', secure=secure) yield _ping_pong_scenario( 'cpp_generic_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_unconstrained_client=True, use_generic_payload=True, + unconstrained_client='async', use_generic_payload=True, secure=secure, categories=smoketest_categories) yield _ping_pong_scenario( 'cpp_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_unconstrained_client=True, use_generic_payload=True, + unconstrained_client='async', use_generic_payload=True, server_core_limit=1, async_server_threads=1, secure=secure) @@ -258,13 +274,13 @@ class CSharpLanguage: yield _ping_pong_scenario( 'csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', categories=[SMOKETEST]) yield _ping_pong_scenario( 'csharp_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - use_unconstrained_client=True) + unconstrained_client='async') yield _ping_pong_scenario( 'csharp_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', @@ -277,6 +293,22 @@ class CSharpLanguage: client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', server_language='c++', server_core_limit=1, async_server_threads=1) + yield _ping_pong_scenario( + 'csharp_to_cpp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', + client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + unconstrained_client='async', server_language='c++') + + yield _ping_pong_scenario( + 'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained', rpc_type='UNARY', + client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', + unconstrained_client='sync', server_language='c++') + + yield _ping_pong_scenario( + 'cpp_to_csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', + client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + unconstrained_client='async', client_language='c++') + + def __str__(self): return 'csharp' @@ -313,14 +345,14 @@ class NodeLanguage: yield _ping_pong_scenario( 'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', categories=[SMOKETEST]) # TODO(jtattermusch): make this scenario work #yield _ping_pong_scenario( # 'node_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - # use_unconstrained_client=True) + # unconstrained_client='async') # TODO(jtattermusch): make this scenario work #yield _ping_pong_scenario( @@ -369,19 +401,15 @@ class PythonLanguage: client_type='SYNC_CLIENT', server_type='SYNC_SERVER', categories=[SMOKETEST]) - # TODO(jtattermusch): - # The qps_worker server gets thread starved with ~6400 threads, the GIL - # enforces that a single thread runs at a time, with no way to set thread - # priority. Re-evaluate after changing DEEP and WIDE. - #yield _ping_pong_scenario( - # 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', - # client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - # use_unconstrained_client=True) + yield _ping_pong_scenario( + 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + unconstrained_client='sync') yield _ping_pong_scenario( 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='SYNC_SERVER', - use_unconstrained_client=True) + unconstrained_client='async') yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', @@ -420,17 +448,15 @@ class RubyLanguage: client_type='SYNC_CLIENT', server_type='SYNC_SERVER', categories=[SMOKETEST]) - # TODO: scenario reports QPS of 0.0 - #yield _ping_pong_scenario( - # 'ruby_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', - # client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - # use_unconstrained_client=True) + yield _ping_pong_scenario( + 'ruby_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + unconstrained_client='sync') - # TODO: scenario reports QPS of 0.0 - #yield _ping_pong_scenario( - # 'ruby_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', - # client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - # use_unconstrained_client=True) + yield _ping_pong_scenario( + 'ruby_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + unconstrained_client='sync') yield _ping_pong_scenario( 'ruby_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', @@ -492,26 +518,26 @@ class JavaLanguage: yield _ping_pong_scenario( 'java_protobuf_async_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS, categories=smoketest_categories) yield _ping_pong_scenario( 'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS) yield _ping_pong_scenario( 'java_generic_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_unconstrained_client=True, use_generic_payload=True, + unconstrained_client='async', use_generic_payload=True, secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS) yield _ping_pong_scenario( 'java_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_unconstrained_client=True, use_generic_payload=True, + unconstrained_client='async', use_generic_payload=True, async_server_threads=1, secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS) @@ -560,25 +586,28 @@ class GoLanguage: secure=secure, categories=smoketest_categories) + # unconstrained_client='async' is intended (client uses goroutines) yield _ping_pong_scenario( 'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY', client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', secure=secure, categories=smoketest_categories) + # unconstrained_client='async' is intended (client uses goroutines) yield _ping_pong_scenario( 'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - use_unconstrained_client=True, + unconstrained_client='async', secure=secure) + # unconstrained_client='async' is intended (client uses goroutines) # ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server, # but that's mostly because of lack of better name of the enum value. yield _ping_pong_scenario( 'go_generic_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_unconstrained_client=True, use_generic_payload=True, + unconstrained_client='async', use_generic_payload=True, secure=secure) # TODO(jtattermusch): add scenarios go vs C++ diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index e813473421..edbdf05e2a 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -82,10 +82,10 @@ class CXXLanguage: return {} def unimplemented_test_cases(self): - return _SKIP_ADVANCED + _SKIP_COMPRESSION + return _SKIP_ADVANCED def unimplemented_test_cases_server(self): - return _SKIP_ADVANCED + _SKIP_COMPRESSION + return _SKIP_ADVANCED def __str__(self): return 'c++' diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py index a7728e7f7d..181d62bf4a 100755 --- a/tools/run_tests/run_performance_tests.py +++ b/tools/run_tests/run_performance_tests.py @@ -304,7 +304,11 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*', # 'SERVER_LANGUAGE' is an indicator for this script to pick # a server in different language. custom_server_lang = scenario_json.get('SERVER_LANGUAGE', None) + custom_client_lang = scenario_json.get('CLIENT_LANGUAGE', None) scenario_json = scenario_config.remove_nonproto_fields(scenario_json) + if custom_server_lang and custom_client_lang: + raise Exception('Cannot set both custom CLIENT_LANGUAGE and SERVER_LANGUAGE' + 'in the same scenario') if custom_server_lang: if not workers_by_lang.get(custom_server_lang, []): print 'Warning: Skipping scenario %s as' % scenario_json['name'] @@ -314,6 +318,16 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*', for idx in range(0, scenario_json['num_servers']): # replace first X workers by workers of a different language workers[idx] = workers_by_lang[custom_server_lang][idx] + if custom_client_lang: + if not workers_by_lang.get(custom_client_lang, []): + print 'Warning: Skipping scenario %s as' % scenario_json['name'] + print('CLIENT_LANGUAGE is set to %s yet the language has ' + 'not been selected with -l' % custom_client_lang) + continue + for idx in range(scenario_json['num_servers'], len(workers)): + # replace all client workers by workers of a different language, + # leave num_server workers as they are server workers. + workers[idx] = workers_by_lang[custom_client_lang][idx] scenario = create_scenario_jobspec(scenario_json, workers, remote_host=remote_host, @@ -360,8 +374,8 @@ argp.add_argument('--bq_result_table', default=None, type=str, help='Bigquery "dataset.table" to upload results to.') argp.add_argument('--category', choices=['smoketest','all'], - default='smoketest', - help='Select a category of tests to run. Smoketest runs by default.') + default='all', + help='Select a category of tests to run.') argp.add_argument('--netperf', default=False, action='store_const', |