diff options
Diffstat (limited to 'src')
22 files changed, 248 insertions, 94 deletions
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 5510c79b18..0e548c61b8 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -47,7 +47,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" -int grpc_compress_filter_trace = 0; +int grpc_compression_trace = 0; typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ @@ -171,7 +171,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, did_compress = grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; @@ -185,12 +185,14 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.", - algo_name); + gpr_log( + GPR_DEBUG, + "Algorithm '%s' enabled but decided not to compress. Input size: %d", + algo_name, calld->slices.length); } } diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h index cf5879d82e..0ce5d08837 100644 --- a/src/core/lib/channel/compress_filter.h +++ b/src/core/lib/channel/compress_filter.h @@ -38,7 +38,7 @@ #define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" -extern int grpc_compress_filter_trace; +extern int grpc_compression_trace; /** Compression filter for outgoing data. * diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c index 809fd5f1fa..c97079f638 100644 --- a/src/core/lib/surface/byte_buffer_reader.c +++ b/src/core/lib/surface/byte_buffer_reader.c @@ -62,12 +62,19 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, case GRPC_BB_RAW: gpr_slice_buffer_init(&decompressed_slices_buffer); if (is_compressed(reader->buffer_in)) { - grpc_msg_decompress(reader->buffer_in->data.raw.compression, - &reader->buffer_in->data.raw.slice_buffer, - &decompressed_slices_buffer); - reader->buffer_out = - grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, - decompressed_slices_buffer.count); + if (grpc_msg_decompress(reader->buffer_in->data.raw.compression, + &reader->buffer_in->data.raw.slice_buffer, + &decompressed_slices_buffer) == 0) { + gpr_log(GPR_ERROR, + "Unexpected error decompressing data for algorithm with enum " + "value '%d'. Reading data as if it were uncompressed.", + reader->buffer_in->data.raw.compression); + reader->buffer_out = reader->buffer_in; + } else { /* all fine */ + reader->buffer_out = + grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); + } gpr_slice_buffer_destroy(&decompressed_slices_buffer); } else { /* not compressed, use the input buffer as output */ reader->buffer_out = reader->buffer_in; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9b2b94eedf..c8728fa278 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -261,6 +261,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, call->channel = channel; call->cq = cq; call->parent = parent_call; + /* Always support no compression */ + GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = server_transport_data == NULL; if (call->is_client) { GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); @@ -408,6 +410,7 @@ static void set_status_code(grpc_call *call, status_source source, static void set_compression_algorithm(grpc_call *call, grpc_compression_algorithm algo) { + GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); call->compression_algorithm = algo; } @@ -828,12 +831,16 @@ static uint32_t decode_status(grpc_mdelem *md) { return status; } -static uint32_t decode_compression(grpc_mdelem *md) { +static grpc_compression_algorithm decode_compression(grpc_mdelem *md) { grpc_compression_algorithm algorithm = grpc_compression_algorithm_from_mdstr(md->value); if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + gpr_log(GPR_ERROR, + "Invalid incoming compression algorithm: '%s'. Interpreting " + "incoming data as uncompressed.", + md_c_str); + return GRPC_COMPRESS_NONE; } return algorithm; } @@ -1087,6 +1094,24 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_initial_filter, call); + /* make sure the received grpc-encoding is amongst the ones listed in + * grpc-accept-encoding */ + + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->compression_algorithm)) { + extern int grpc_compression_trace; + if (grpc_compression_trace) { + char *algo_name; + grpc_compression_algorithm_name(call->compression_algorithm, + &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } + } if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { @@ -1474,7 +1499,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_call_error err; GRPC_API_TRACE( - "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", + "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, " + "reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); if (reserved != NULL) { diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 57c6897626..1c8b709015 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -164,7 +164,7 @@ void grpc_init(void) { grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("compression", &grpc_compress_filter_trace); + grpc_register_tracer("compression", &grpc_compression_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_executor_init(); diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index a2c1c08169..884130e7d4 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -89,7 +89,7 @@ zend_object_value create_wrapped_grpc_call(zend_class_entry *class_type /* Wraps a grpc_call struct in a PHP object. Owned indicates whether the struct should be destroyed at the end of the object's lifecycle */ -zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) { +zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC) { zval *call_object; MAKE_STD_ZVAL(call_object); object_init_ex(call_object, grpc_ce_call); @@ -102,7 +102,7 @@ zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) { /* Creates and returns a PHP array object with the data in a * grpc_metadata_array. Returns NULL on failure */ -zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) { +zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array TSRMLS_DC) { int count = metadata_array->count; grpc_metadata *elements = metadata_array->metadata; int i; @@ -127,7 +127,7 @@ zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) { if (zend_hash_find(array_hash, str_key, key_len, (void **)data) == SUCCESS) { if (Z_TYPE_P(*data) != IS_ARRAY) { - zend_throw_exception(zend_exception_get_default(), + zend_throw_exception(zend_exception_get_default(TSRMLS_C), "Metadata hash somehow contains wrong types.", 1 TSRMLS_CC); efree(str_key); @@ -454,7 +454,7 @@ PHP_METHOD(Call, startBatch) { add_property_bool(result, "send_status", true); break; case GRPC_OP_RECV_INITIAL_METADATA: - array = grpc_parse_metadata_array(&recv_metadata); + array = grpc_parse_metadata_array(&recv_metadata TSRMLS_CC); add_property_zval(result, "metadata", array); Z_DELREF_P(array); break; @@ -470,7 +470,7 @@ PHP_METHOD(Call, startBatch) { case GRPC_OP_RECV_STATUS_ON_CLIENT: MAKE_STD_ZVAL(recv_status); object_init(recv_status); - array = grpc_parse_metadata_array(&recv_trailing_metadata); + array = grpc_parse_metadata_array(&recv_trailing_metadata TSRMLS_CC); add_property_zval(recv_status, "metadata", array); Z_DELREF_P(array); add_property_long(recv_status, "code", status); diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h index 73efadae35..36c5f2d272 100644 --- a/src/php/ext/grpc/call.h +++ b/src/php/ext/grpc/call.h @@ -60,11 +60,11 @@ typedef struct wrapped_grpc_call { void grpc_init_call(TSRMLS_D); /* Creates a Call object that wraps the given grpc_call struct */ -zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned); +zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC); /* Creates and returns a PHP associative array of metadata from a C array of * call metadata */ -zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array); +zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array TSRMLS_DC); /* Populates a grpc_metadata_array with the data in a PHP array object. Returns true on success and false on failure */ diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index 285c4e7c85..ec0e6b9181 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -83,7 +83,7 @@ zend_object_value create_wrapped_grpc_call_credentials( return retval; } -zval *grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped) { +zval *grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped TSRMLS_DC) { zval *credentials_object; MAKE_STD_ZVAL(credentials_object); object_init_ex(credentials_object, grpc_ce_call_credentials); @@ -122,7 +122,7 @@ PHP_METHOD(CallCredentials, createComposite) { grpc_call_credentials *creds = grpc_composite_call_credentials_create(cred1->wrapped, cred2->wrapped, NULL); - zval *creds_object = grpc_php_wrap_call_credentials(creds); + zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -141,7 +141,7 @@ PHP_METHOD(CallCredentials, createFromPlugin) { memset(fci_cache, 0, sizeof(zend_fcall_info_cache)); /* "f" == 1 function */ - if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", fci, + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", fci, fci_cache, fci->params, fci->param_count) == FAILURE) { @@ -167,7 +167,7 @@ PHP_METHOD(CallCredentials, createFromPlugin) { grpc_call_credentials *creds = grpc_metadata_credentials_create_from_plugin( plugin, NULL); - zval *creds_object = grpc_php_wrap_call_credentials(creds); + zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -175,6 +175,8 @@ PHP_METHOD(CallCredentials, createFromPlugin) { void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, grpc_credentials_plugin_metadata_cb cb, void *user_data) { + TSRMLS_FETCH(); + plugin_state *state = (plugin_state *)ptr; /* prepare to call the user callback function with info from the @@ -192,7 +194,7 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, state->fci->retval_ptr_ptr = &retval; /* call the user callback function */ - zend_call_function(state->fci, state->fci_cache); + zend_call_function(state->fci, state->fci_cache TSRMLS_CC); if (Z_TYPE_P(retval) != IS_ARRAY) { zend_throw_exception(spl_ce_InvalidArgumentException, diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c index eba2c81424..9f0431908f 100644 --- a/src/php/ext/grpc/channel.c +++ b/src/php/ext/grpc/channel.c @@ -84,7 +84,7 @@ zend_object_value create_wrapped_grpc_channel(zend_class_entry *class_type return retval; } -void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args) { +void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC) { HashTable *array_hash; HashPosition array_pointer; int args_index; @@ -168,7 +168,7 @@ PHP_METHOD(Channel, __construct) { zend_hash_del(array_hash, "credentials", 12); } } - php_grpc_read_args_array(args_array, &args); + php_grpc_read_args_array(args_array, &args TSRMLS_CC); if (creds == NULL) { channel->wrapped = grpc_insecure_channel_create(target, &args, NULL); } else { diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h index 78a16ed0c9..cc5823ee7f 100755 --- a/src/php/ext/grpc/channel.h +++ b/src/php/ext/grpc/channel.h @@ -59,6 +59,6 @@ typedef struct wrapped_grpc_channel { void grpc_init_channel(TSRMLS_D); /* Iterates through a PHP array and populates args with the contents */ -void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args); +void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC); #endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */ diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c index ae9a9897fc..5c537378a6 100644 --- a/src/php/ext/grpc/channel_credentials.c +++ b/src/php/ext/grpc/channel_credentials.c @@ -82,7 +82,7 @@ zend_object_value create_wrapped_grpc_channel_credentials( return retval; } -zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped) { +zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped TSRMLS_DC) { zval *credentials_object; MAKE_STD_ZVAL(credentials_object); object_init_ex(credentials_object, grpc_ce_channel_credentials); @@ -99,7 +99,7 @@ zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped) { */ PHP_METHOD(ChannelCredentials, createDefault) { grpc_channel_credentials *creds = grpc_google_default_credentials_create(); - zval *creds_object = grpc_php_wrap_channel_credentials(creds); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -134,7 +134,7 @@ PHP_METHOD(ChannelCredentials, createSsl) { grpc_channel_credentials *creds = grpc_ssl_credentials_create( pem_root_certs, pem_key_cert_pair.private_key == NULL ? NULL : &pem_key_cert_pair, NULL); - zval *creds_object = grpc_php_wrap_channel_credentials(creds); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -165,7 +165,7 @@ PHP_METHOD(ChannelCredentials, createComposite) { grpc_channel_credentials *creds = grpc_composite_channel_credentials_create(cred1->wrapped, cred2->wrapped, NULL); - zval *creds_object = grpc_php_wrap_channel_credentials(creds); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c index ca129e76ca..6df2e4f978 100644 --- a/src/php/ext/grpc/server.c +++ b/src/php/ext/grpc/server.c @@ -111,7 +111,7 @@ PHP_METHOD(Server, __construct) { if (args_array == NULL) { server->wrapped = grpc_server_create(NULL, NULL); } else { - php_grpc_read_args_array(args_array, &args); + php_grpc_read_args_array(args_array, &args TSRMLS_CC); server->wrapped = grpc_server_create(&args, NULL); efree(args.args); } @@ -154,12 +154,12 @@ PHP_METHOD(Server, requestCall) { 1 TSRMLS_CC); goto cleanup; } - add_property_zval(result, "call", grpc_php_wrap_call(call, true)); + add_property_zval(result, "call", grpc_php_wrap_call(call, true TSRMLS_CC)); add_property_string(result, "method", details.method, true); add_property_string(result, "host", details.host, true); add_property_zval(result, "absolute_deadline", - grpc_php_wrap_timeval(details.deadline)); - add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata)); + grpc_php_wrap_timeval(details.deadline TSRMLS_CC)); + add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata TSRMLS_CC)); cleanup: grpc_call_details_destroy(&details); grpc_metadata_array_destroy(&metadata); diff --git a/src/php/ext/grpc/server_credentials.c b/src/php/ext/grpc/server_credentials.c index f3951b31fe..505da10a28 100644 --- a/src/php/ext/grpc/server_credentials.c +++ b/src/php/ext/grpc/server_credentials.c @@ -81,7 +81,7 @@ zend_object_value create_wrapped_grpc_server_credentials( return retval; } -zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped) { +zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped TSRMLS_DC) { zval *server_credentials_object; MAKE_STD_ZVAL(server_credentials_object); object_init_ex(server_credentials_object, grpc_ce_server_credentials); @@ -120,7 +120,7 @@ PHP_METHOD(ServerCredentials, createSsl) { grpc_server_credentials *creds = grpc_ssl_server_credentials_create_ex( pem_root_certs, &pem_key_cert_pair, 1, GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, NULL); - zval *creds_object = grpc_php_wrap_server_credentials(creds); + zval *creds_object = grpc_php_wrap_server_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/timeval.c b/src/php/ext/grpc/timeval.c index 4fd069e19a..5e242162a8 100644 --- a/src/php/ext/grpc/timeval.c +++ b/src/php/ext/grpc/timeval.c @@ -72,7 +72,7 @@ zend_object_value create_wrapped_grpc_timeval(zend_class_entry *class_type return retval; } -zval *grpc_php_wrap_timeval(gpr_timespec wrapped) { +zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC) { zval *timeval_object; MAKE_STD_ZVAL(timeval_object); object_init_ex(timeval_object, grpc_ce_timeval); @@ -122,7 +122,7 @@ PHP_METHOD(Timeval, add) { wrapped_grpc_timeval *other = (wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC); zval *sum = - grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped)); + grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped) TSRMLS_CC); RETURN_DESTROY_ZVAL(sum); } @@ -146,7 +146,7 @@ PHP_METHOD(Timeval, subtract) { wrapped_grpc_timeval *other = (wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC); zval *diff = - grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped)); + grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped) TSRMLS_CC); RETURN_DESTROY_ZVAL(diff); } @@ -208,7 +208,7 @@ PHP_METHOD(Timeval, similar) { * @return Timeval The current time */ PHP_METHOD(Timeval, now) { - zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME)); + zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(now); } @@ -218,7 +218,7 @@ PHP_METHOD(Timeval, now) { */ PHP_METHOD(Timeval, zero) { zval *grpc_php_timeval_zero = - grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME)); + grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_ZVAL(grpc_php_timeval_zero, false, /* Copy original before returning? */ true /* Destroy original before returning */); @@ -230,7 +230,7 @@ PHP_METHOD(Timeval, zero) { */ PHP_METHOD(Timeval, infFuture) { zval *grpc_php_timeval_inf_future = - grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_future); } @@ -240,7 +240,7 @@ PHP_METHOD(Timeval, infFuture) { */ PHP_METHOD(Timeval, infPast) { zval *grpc_php_timeval_inf_past = - grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME)); + grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_past); } diff --git a/src/php/ext/grpc/timeval.h b/src/php/ext/grpc/timeval.h index 07cef037cb..7456eb6d58 100755 --- a/src/php/ext/grpc/timeval.h +++ b/src/php/ext/grpc/timeval.h @@ -63,6 +63,6 @@ void grpc_init_timeval(TSRMLS_D); void grpc_shutdown_timeval(TSRMLS_D); /* Creates a Timeval object that wraps the given timeval struct */ -zval *grpc_php_wrap_timeval(gpr_timespec wrapped); +zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC); #endif /* NET_GRPC_PHP_GRPC_TIMEVAL_H_ */ diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py index b13d8dd9dd..00788bd4cf 100644 --- a/src/python/grpcio/grpc/_adapter/_low.py +++ b/src/python/grpcio/grpc/_adapter/_low.py @@ -195,26 +195,30 @@ class Call(_types.Call): translated_op = cygrpc.operation_send_initial_metadata( cygrpc.Metadata( cygrpc.Metadatum(key, value) - for key, value in op.initial_metadata)) + for key, value in op.initial_metadata), + op.flags) elif op.type == _types.OpType.SEND_MESSAGE: - translated_op = cygrpc.operation_send_message(op.message) + translated_op = cygrpc.operation_send_message(op.message, op.flags) elif op.type == _types.OpType.SEND_CLOSE_FROM_CLIENT: - translated_op = cygrpc.operation_send_close_from_client() + translated_op = cygrpc.operation_send_close_from_client(op.flags) elif op.type == _types.OpType.SEND_STATUS_FROM_SERVER: translated_op = cygrpc.operation_send_status_from_server( cygrpc.Metadata( cygrpc.Metadatum(key, value) for key, value in op.trailing_metadata), op.status.code, - op.status.details) + op.status.details, + op.flags) elif op.type == _types.OpType.RECV_INITIAL_METADATA: - translated_op = cygrpc.operation_receive_initial_metadata() + translated_op = cygrpc.operation_receive_initial_metadata( + op.flags) elif op.type == _types.OpType.RECV_MESSAGE: - translated_op = cygrpc.operation_receive_message() + translated_op = cygrpc.operation_receive_message(op.flags) elif op.type == _types.OpType.RECV_STATUS_ON_CLIENT: - translated_op = cygrpc.operation_receive_status_on_client() + translated_op = cygrpc.operation_receive_status_on_client( + op.flags) elif op.type == _types.OpType.RECV_CLOSE_ON_SERVER: - translated_op = cygrpc.operation_receive_close_on_server() + translated_op = cygrpc.operation_receive_close_on_server(op.flags) else: raise ValueError('unexpected operation type {}'.format(op.type)) translated_ops.append(translated_op) diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py index 8ca7ff4b60..f8405949d4 100644 --- a/src/python/grpcio/grpc/_adapter/_types.py +++ b/src/python/grpcio/grpc/_adapter/_types.py @@ -152,7 +152,7 @@ class OpArgs(collections.namedtuple( 'trailing_metadata', 'message', 'status', - 'write_flags', + 'flags', ])): """Arguments passed into a GRPC operation. @@ -165,7 +165,7 @@ class OpArgs(collections.namedtuple( message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None. status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else is None. - write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values. + flags (int): a bitwise OR'ing of 0 or more OpWriteFlags values. """ @staticmethod diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 3d158a7707..66e6e6b549 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -140,6 +140,9 @@ cdef extern from "grpc/_cython/loader.h": const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG + const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM + const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL + const char *GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET const int GRPC_WRITE_BUFFER_HINT const int GRPC_WRITE_NO_COMPRESS @@ -425,3 +428,38 @@ cdef extern from "grpc/_cython/loader.h": grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( grpc_metadata_credentials_plugin plugin, void *reserved) nogil + + ctypedef enum grpc_compression_algorithm: + GRPC_COMPRESS_NONE + GRPC_COMPRESS_DEFLATE + GRPC_COMPRESS_GZIP + GRPC_COMPRESS_ALGORITHMS_COUNT + + ctypedef enum grpc_compression_level: + GRPC_COMPRESS_LEVEL_NONE + GRPC_COMPRESS_LEVEL_LOW + GRPC_COMPRESS_LEVEL_MED + GRPC_COMPRESS_LEVEL_HIGH + GRPC_COMPRESS_LEVEL_COUNT + + ctypedef struct grpc_compression_options: + uint32_t enabled_algorithms_bitset + grpc_compression_algorithm default_compression_algorithm + + int grpc_compression_algorithm_parse( + const char *name, size_t name_length, + grpc_compression_algorithm *algorithm) nogil + int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, + char **name) nogil + grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level, uint32_t accepted_encodings) nogil + void grpc_compression_options_init(grpc_compression_options *opts) nogil + void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 30397818a1..0474697af8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -124,3 +124,7 @@ cdef class Operations: cdef size_t c_nops cdef list operations + +cdef class CompressionOptions: + + cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index c2202bdab2..c7539f0d49 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -103,6 +103,19 @@ class OperationType: receive_close_on_server = GRPC_OP_RECV_CLOSE_ON_SERVER +class CompressionAlgorithm: + none = GRPC_COMPRESS_NONE + deflate = GRPC_COMPRESS_DEFLATE + gzip = GRPC_COMPRESS_GZIP + + +class CompressionLevel: + none = GRPC_COMPRESS_LEVEL_NONE + low = GRPC_COMPRESS_LEVEL_LOW + medium = GRPC_COMPRESS_LEVEL_MED + high = GRPC_COMPRESS_LEVEL_HIGH + + cdef class Timespec: def __cinit__(self, time): @@ -473,6 +486,10 @@ cdef class Operation: return self.c_op.type @property + def flags(self): + return self.c_op.flags + + @property def has_status(self): return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT @@ -553,9 +570,10 @@ cdef class Operation: with nogil: gpr_free(self._received_status_details) -def operation_send_initial_metadata(Metadata metadata): +def operation_send_initial_metadata(Metadata metadata, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA + op.c_op.flags = flags op.c_op.data.send_initial_metadata.count = metadata.c_metadata_array.count op.c_op.data.send_initial_metadata.metadata = ( metadata.c_metadata_array.metadata) @@ -563,23 +581,25 @@ def operation_send_initial_metadata(Metadata metadata): op.is_valid = True return op -def operation_send_message(data): +def operation_send_message(data, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_MESSAGE + op.c_op.flags = flags byte_buffer = ByteBuffer(data) op.c_op.data.send_message = byte_buffer.c_byte_buffer op.references.append(byte_buffer) op.is_valid = True return op -def operation_send_close_from_client(): +def operation_send_close_from_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT + op.c_op.flags = flags op.is_valid = True return op def operation_send_status_from_server( - Metadata metadata, grpc_status_code code, details): + Metadata metadata, grpc_status_code code, details, int flags): if isinstance(details, bytes): pass elif isinstance(details, basestring): @@ -588,6 +608,7 @@ def operation_send_status_from_server( raise TypeError("expected a str or bytes object for details") cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER + op.c_op.flags = flags op.c_op.data.send_status_from_server.trailing_metadata_count = ( metadata.c_metadata_array.count) op.c_op.data.send_status_from_server.trailing_metadata = ( @@ -599,18 +620,20 @@ def operation_send_status_from_server( op.is_valid = True return op -def operation_receive_initial_metadata(): +def operation_receive_initial_metadata(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA + op.c_op.flags = flags op._received_metadata = Metadata([]) op.c_op.data.receive_initial_metadata = ( &op._received_metadata.c_metadata_array) op.is_valid = True return op -def operation_receive_message(): +def operation_receive_message(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_MESSAGE + op.c_op.flags = flags op._received_message = ByteBuffer(None) # n.b. the c_op.data.receive_message field needs to be deleted by us, # anyway, so we just let that be handled by the ByteBuffer() we allocated @@ -619,9 +642,10 @@ def operation_receive_message(): op.is_valid = True return op -def operation_receive_status_on_client(): +def operation_receive_status_on_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT + op.c_op.flags = flags op._received_metadata = Metadata([]) op.c_op.data.receive_status_on_client.trailing_metadata = ( &op._received_metadata.c_metadata_array) @@ -634,9 +658,10 @@ def operation_receive_status_on_client(): op.is_valid = True return op -def operation_receive_close_on_server(): +def operation_receive_close_on_server(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER + op.c_op.flags = flags op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled op.is_valid = True return op @@ -692,3 +717,36 @@ cdef class Operations: def __iter__(self): return _OperationsIterator(self) + +cdef class CompressionOptions: + + def __cinit__(self): + with nogil: + grpc_compression_options_init(&self.c_options) + + def enable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_enable_algorithm(&self.c_options, algorithm) + + def disable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_disable_algorithm(&self.c_options, algorithm) + + def is_algorithm_enabled(self, grpc_compression_algorithm algorithm): + cdef int result + with nogil: + result = grpc_compression_options_is_algorithm_enabled( + &self.c_options, algorithm) + return result + + def to_channel_arg(self): + return ChannelArg(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, + self.c_options.enabled_algorithms_bitset) + + +def compression_algorithm_name(grpc_compression_algorithm algorithm): + cdef char* name + with nogil: + grpc_compression_algorithm_name(algorithm, &name) + # Let Cython do the right thing with string casting + return name diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index e6750c4fbf..a364075e9e 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -873,14 +873,15 @@ void pygrpc_load_imports(HMODULE library); #else /* !GPR_WIN32 */ -#include <grpc/support/alloc.h> -#include <grpc/support/slice.h> -#include <grpc/support/time.h> -#include <grpc/status.h> #include <grpc/byte_buffer.h> #include <grpc/byte_buffer_reader.h> +#include <grpc/compression.h> #include <grpc/grpc.h> #include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> +#include <grpc/support/slice.h> +#include <grpc/support/time.h> +#include <grpc/status.h> #endif /* !GPR_WIN32 */ diff --git a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py index 876da88de9..0a511101f0 100644 --- a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py @@ -40,6 +40,7 @@ from tests.unit import resources _SSL_HOST_OVERRIDE = 'foo.test.google.fr' _CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key' _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' +_EMPTY_FLAGS = 0 def _metadata_plugin_callback(context, callback): callback(cygrpc.Metadata( @@ -76,7 +77,7 @@ class TypeSmokeTest(unittest.TestCase): def testOperationsIteration(self): operations = cygrpc.Operations([ - cygrpc.operation_send_message('asdf')]) + cygrpc.operation_send_message('asdf', _EMPTY_FLAGS)]) iterator = iter(operations) operation = next(iterator) self.assertIsInstance(operation, cygrpc.Operation) @@ -85,6 +86,11 @@ class TypeSmokeTest(unittest.TestCase): with self.assertRaises(StopIteration): next(iterator) + def testOperationFlags(self): + operation = cygrpc.operation_send_message('asdf', + cygrpc.WriteFlag.no_compress) + self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags) + def testTimespec(self): now = time.time() timespec = cygrpc.Timespec(now) @@ -188,12 +194,13 @@ class InsecureServerInsecureClient(unittest.TestCase): CLIENT_METADATA_ASCII_VALUE), cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]) client_start_batch_result = client_call.start_batch(cygrpc.Operations([ - cygrpc.operation_send_initial_metadata(client_initial_metadata), - cygrpc.operation_send_message(REQUEST), - cygrpc.operation_send_close_from_client(), - cygrpc.operation_receive_initial_metadata(), - cygrpc.operation_receive_message(), - cygrpc.operation_receive_status_on_client() + cygrpc.operation_send_initial_metadata(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) ]), client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -223,12 +230,14 @@ class InsecureServerInsecureClient(unittest.TestCase): cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]) server_start_batch_result = server_call.start_batch([ - cygrpc.operation_send_initial_metadata(server_initial_metadata), - cygrpc.operation_receive_message(), - cygrpc.operation_send_message(RESPONSE), - cygrpc.operation_receive_close_on_server(), + cygrpc.operation_send_initial_metadata(server_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), + cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), cygrpc.operation_send_status_from_server( - server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) + server_trailing_metadata, SERVER_STATUS_CODE, + SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) @@ -349,12 +358,13 @@ class SecureServerSecureClient(unittest.TestCase): CLIENT_METADATA_ASCII_VALUE), cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]) client_start_batch_result = client_call.start_batch(cygrpc.Operations([ - cygrpc.operation_send_initial_metadata(client_initial_metadata), - cygrpc.operation_send_message(REQUEST), - cygrpc.operation_send_close_from_client(), - cygrpc.operation_receive_initial_metadata(), - cygrpc.operation_receive_message(), - cygrpc.operation_receive_status_on_client() + cygrpc.operation_send_initial_metadata(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) ]), client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -387,12 +397,14 @@ class SecureServerSecureClient(unittest.TestCase): cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]) server_start_batch_result = server_call.start_batch([ - cygrpc.operation_send_initial_metadata(server_initial_metadata), - cygrpc.operation_receive_message(), - cygrpc.operation_send_message(RESPONSE), - cygrpc.operation_receive_close_on_server(), + cygrpc.operation_send_initial_metadata(server_initial_metadata, + _EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), + cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), cygrpc.operation_send_status_from_server( - server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) + server_trailing_metadata, SERVER_STATUS_CODE, + SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) |