From 45a520f51fb020e5bff992b71efa702d812134db Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 24 Jan 2017 09:40:21 -0800 Subject: Special case Python build for inet_ntop --- include/grpc/impl/codegen/port_platform.h | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'include/grpc/impl') diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index 4633fa12e8..e565cd31d7 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -297,6 +297,10 @@ #endif #ifdef _MSC_VER +#ifdef _PYTHON_MSVC +// The Python 3.5 Windows runtime is missing InetNtop +#define GPR_WIN_INET_NTOP +#endif // _PYTHON_MSVC #if _MSC_VER < 1700 typedef __int8 int8_t; typedef __int16 int16_t; -- cgit v1.2.3 From e6dd773dffb9eaf30361f616bfb17c8a7ecebdac Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 25 Jan 2017 10:44:30 -0800 Subject: Move parameters for all grpc_op types into their own sub-structs. --- include/grpc++/impl/codegen/call.h | 9 +++++---- include/grpc/impl/codegen/grpc_types.h | 12 ++++++++--- src/core/ext/lb_policy/grpclb/grpclb.c | 9 +++++---- src/core/lib/surface/call.c | 13 +++++++----- src/core/lib/surface/call_log_batch.c | 8 +++++--- src/core/lib/surface/server.c | 5 +++-- src/csharp/ext/grpc_csharp_ext.c | 23 ++++++++++++---------- src/node/ext/call.cc | 6 +++--- .../GRPCClient/private/GRPCWrappedCall.m | 8 ++++---- .../tests/CronetUnitTests/CronetUnitTests.m | 6 +++--- src/php/ext/grpc/call.c | 9 +++++---- .../grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 7 ++++--- src/ruby/ext/grpc/rb_call.c | 12 ++++++----- 13 files changed, 74 insertions(+), 53 deletions(-) (limited to 'include/grpc/impl') diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 6ab00612f6..475f8d11bc 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -249,7 +249,7 @@ class CallOpSendMessage { op->op = GRPC_OP_SEND_MESSAGE; op->flags = write_options_.flags(); op->reserved = NULL; - op->data.send_message = send_buf_; + op->data.send_message.send_message = send_buf_; // Flags are per-message: clear them after use. write_options_.Clear(); } @@ -298,7 +298,7 @@ class CallOpRecvMessage { op->op = GRPC_OP_RECV_MESSAGE; op->flags = 0; op->reserved = NULL; - op->data.recv_message = &recv_buf_; + op->data.recv_message.recv_message = &recv_buf_; } void FinishOp(bool* status, int max_receive_message_size) { @@ -379,7 +379,7 @@ class CallOpGenericRecvMessage { op->op = GRPC_OP_RECV_MESSAGE; op->flags = 0; op->reserved = NULL; - op->data.recv_message = &recv_buf_; + op->data.recv_message.recv_message = &recv_buf_; } void FinishOp(bool* status, int max_receive_message_size) { @@ -486,7 +486,8 @@ class CallOpRecvInitialMetadata { memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_)); grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &recv_initial_metadata_arr_; + op->data.recv_initial_metadata.recv_initial_metadata = + &recv_initial_metadata_arr_; op->flags = 0; op->reserved = NULL; } diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index ee8101aab8..8d20f705ee 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -418,7 +418,9 @@ typedef struct grpc_op { grpc_compression_level level; } maybe_compression_level; } send_initial_metadata; - struct grpc_byte_buffer *send_message; + struct { + struct grpc_byte_buffer *send_message; + } send_message; struct { size_t trailing_metadata_count; grpc_metadata *trailing_metadata; @@ -430,11 +432,15 @@ typedef struct grpc_op { object, recv_initial_metadata->array is owned by the caller). After the operation completes, call grpc_metadata_array_destroy on this value, or reuse it in a future op. */ - grpc_metadata_array *recv_initial_metadata; + struct { + grpc_metadata_array *recv_initial_metadata; + } recv_initial_metadata; /** ownership of the byte buffer is moved to the caller; the caller must call grpc_byte_buffer_destroy on this value, or reuse it in a future op. */ - struct grpc_byte_buffer **recv_message; + struct { + struct grpc_byte_buffer **recv_message; + } recv_message; struct { /** ownership of the array is with the caller, but ownership of the elements stays with the call object (ie key, value members are owned diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 97f98df03a..567e65ac69 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -1178,14 +1178,15 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; + op->data.recv_initial_metadata.recv_initial_metadata = + &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = glb_policy->lb_request_payload; + op->data.send_message.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; @@ -1211,7 +1212,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op = ops; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &glb_policy->lb_response_payload; + op->data.recv_message.recv_message = &glb_policy->lb_response_payload; op->flags = 0; op->reserved = NULL; op++; @@ -1293,7 +1294,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &glb_policy->lb_response_payload; + op->data.recv_message.recv_message = &glb_policy->lb_response_payload; op->flags = 0; op->reserved = NULL; op++; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 899e8fab3f..8de0b41fa4 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1461,7 +1461,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } - if (op->data.send_message == NULL) { + if (op->data.send_message.send_message == NULL) { error = GRPC_CALL_ERROR_INVALID_MESSAGE; goto done_with_error; } @@ -1473,11 +1473,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->sending_message = 1; grpc_slice_buffer_stream_init( &call->sending_stream, - &op->data.send_message->data.raw.slice_buffer, op->flags); + &op->data.send_message.send_message->data.raw.slice_buffer, + op->flags); /* If the outgoing buffer is already compressed, mark it as so in the flags. These will be picked up by the compression filter and further (wasteful) attempts at compression skipped. */ - if (op->data.send_message->data.raw.compression > GRPC_COMPRESS_NONE) { + if (op->data.send_message.send_message->data.raw.compression + > GRPC_COMPRESS_NONE) { call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } stream_op->send_message = &call->sending_stream.base; @@ -1565,7 +1567,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, that case we're not necessarily covered by a poller. */ stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = 1; - call->buffered_metadata[0] = op->data.recv_initial_metadata; + call->buffered_metadata[0] = + op->data.recv_initial_metadata.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl, grpc_schedule_on_exec_ctx); @@ -1588,7 +1591,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } call->receiving_message = 1; bctl->recv_message = 1; - call->receiving_buffer = op->data.recv_message; + call->receiving_buffer = op->data.recv_message.recv_message; stream_op->recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl, grpc_schedule_on_exec_ctx); diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c index 31c074f15d..61b73a138f 100644 --- a/src/core/lib/surface/call_log_batch.c +++ b/src/core/lib/surface/call_log_batch.c @@ -63,7 +63,8 @@ char *grpc_op_string(const grpc_op *op) { op->data.send_initial_metadata.count); break; case GRPC_OP_SEND_MESSAGE: - gpr_asprintf(&tmp, "SEND_MESSAGE ptr=%p", op->data.send_message); + gpr_asprintf(&tmp, "SEND_MESSAGE ptr=%p", + op->data.send_message.send_message); gpr_strvec_add(&b, tmp); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: @@ -79,11 +80,12 @@ char *grpc_op_string(const grpc_op *op) { break; case GRPC_OP_RECV_INITIAL_METADATA: gpr_asprintf(&tmp, "RECV_INITIAL_METADATA ptr=%p", - op->data.recv_initial_metadata); + op->data.recv_initial_metadata.recv_initial_metadata); gpr_strvec_add(&b, tmp); break; case GRPC_OP_RECV_MESSAGE: - gpr_asprintf(&tmp, "RECV_MESSAGE ptr=%p", op->data.recv_message); + gpr_asprintf(&tmp, "RECV_MESSAGE ptr=%p", + op->data.recv_message.recv_message); gpr_strvec_add(&b, tmp); break; case GRPC_OP_RECV_STATUS_ON_CLIENT: diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index addb7c4fbc..9e916465be 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -609,7 +609,7 @@ static void finish_start_new_rpc( grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; - op.data.recv_message = &calld->payload; + op.data.recv_message.recv_message = &calld->payload; grpc_closure_init(&calld->publish, publish_new_rpc, elem, grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, @@ -857,7 +857,8 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_INITIAL_METADATA; - op.data.recv_initial_metadata = &calld->initial_metadata; + op.data.recv_initial_metadata.recv_initial_metadata = + &calld->initial_metadata; grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem, grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 946f5872c0..7239606c64 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -537,7 +537,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); - ops[1].data.send_message = ctx->send_message; + ops[1].data.send_message.send_message = ctx->send_message; ops[1].flags = write_flags; ops[1].reserved = NULL; @@ -546,12 +546,13 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, ops[2].reserved = NULL; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[3].data.recv_initial_metadata.recv_initial_metadata = + &(ctx->recv_initial_metadata); ops[3].flags = 0; ops[3].reserved = NULL; ops[4].op = GRPC_OP_RECV_MESSAGE; - ops[4].data.recv_message = &(ctx->recv_message); + ops[4].data.recv_message.recv_message = &(ctx->recv_message); ops[4].flags = 0; ops[4].reserved = NULL; @@ -590,12 +591,13 @@ grpcsharp_call_start_client_streaming(grpc_call *call, ops[0].reserved = NULL; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[1].data.recv_initial_metadata.recv_initial_metadata = + &(ctx->recv_initial_metadata); ops[1].flags = 0; ops[1].reserved = NULL; ops[2].op = GRPC_OP_RECV_MESSAGE; - ops[2].data.recv_message = &(ctx->recv_message); + ops[2].data.recv_message.recv_message = &(ctx->recv_message); ops[2].flags = 0; ops[2].reserved = NULL; @@ -634,7 +636,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); - ops[1].data.send_message = ctx->send_message; + ops[1].data.send_message.send_message = ctx->send_message; ops[1].flags = write_flags; ops[1].reserved = NULL; @@ -698,7 +700,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata( /* TODO: don't use magic number */ grpc_op ops[1]; ops[0].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[0].data.recv_initial_metadata.recv_initial_metadata = + &(ctx->recv_initial_metadata); ops[0].flags = 0; ops[0].reserved = NULL; @@ -717,7 +720,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, size_t nops = send_empty_initial_metadata ? 2 : 1; ops[0].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); - ops[0].data.send_message = ctx->send_message; + ops[0].data.send_message.send_message = ctx->send_message; ops[0].flags = write_flags; ops[0].reserved = NULL; ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; @@ -765,7 +768,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ops[nops].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(optional_send_buffer, optional_send_buffer_len); - ops[nops].data.send_message = ctx->send_message; + ops[nops].data.send_message.send_message = ctx->send_message; ops[nops].flags = write_flags; ops[nops].reserved = NULL; nops ++; @@ -784,7 +787,7 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) { /* TODO: don't use magic number */ grpc_op ops[1]; ops[0].op = GRPC_OP_RECV_MESSAGE; - ops[0].data.recv_message = &(ctx->recv_message); + ops[0].data.recv_message.recv_message = &(ctx->recv_message); ops[0].flags = 0; ops[0].reserved = NULL; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 191e763e0e..96f66a48d2 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -262,7 +262,7 @@ class SendMessageOp : public Op { } } send_message = BufferToByteBuffer(value); - out->data.send_message = send_message; + out->data.send_message.send_message = send_message; PersistentValue *handle = new PersistentValue(value); resources->handles.push_back(unique_ptr(handle)); return true; @@ -377,7 +377,7 @@ class GetMetadataOp : public Op { bool ParseOp(Local value, grpc_op *out, shared_ptr resources) { - out->data.recv_initial_metadata = &recv_metadata; + out->data.recv_initial_metadata.recv_initial_metadata = &recv_metadata; return true; } bool IsFinalOp() { @@ -410,7 +410,7 @@ class ReadMessageOp : public Op { bool ParseOp(Local value, grpc_op *out, shared_ptr resources) { - out->data.recv_message = &recv_message; + out->data.recv_message.recv_message = &recv_message; return true; } bool IsFinalOp() { diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 38fcae0299..45d0aae203 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -105,14 +105,14 @@ } if (self = [super init]) { _op.op = GRPC_OP_SEND_MESSAGE; - _op.data.send_message = message.grpc_byteBuffer; + _op.data.send_message.send_message = message.grpc_byteBuffer; _handler = handler; } return self; } - (void)dealloc { - grpc_byte_buffer_destroy(_op.data.send_message); + grpc_byte_buffer_destroy(_op.data.send_message.send_message); } @end @@ -145,7 +145,7 @@ if (self = [super init]) { _op.op = GRPC_OP_RECV_INITIAL_METADATA; grpc_metadata_array_init(&_headers); - _op.data.recv_initial_metadata = &_headers; + _op.data.recv_initial_metadata.recv_initial_metadata = &_headers; if (handler) { // Prevent reference cycle with _handler __weak typeof(self) weakSelf = self; @@ -177,7 +177,7 @@ - (instancetype)initWithHandler:(void (^)(grpc_byte_buffer *))handler { if (self = [super init]) { _op.op = GRPC_OP_RECV_MESSAGE; - _op.data.recv_message = &_receivedMessage; + _op.data.recv_message.recv_message = &_receivedMessage; if (handler) { // Prevent reference cycle with _handler __weak typeof(self) weakSelf = self; diff --git a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m index dbd28076dd..43b91a072b 100644 --- a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m +++ b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m @@ -142,7 +142,7 @@ static void drain_cq(grpc_completion_queue *cq) { op->reserved = NULL; op++; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = request_payload; + op->data.send_message.send_message = request_payload; op->flags = 0; op->reserved = NULL; op++; @@ -151,12 +151,12 @@ static void drain_cq(grpc_completion_queue *cq) { op->reserved = NULL; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &initial_metadata_recv; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &response_payload_recv; + op->data.recv_message.recv_message = &response_payload_recv; op->flags = 0; op->reserved = NULL; op++; diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 64b1137c2a..be11e63d62 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -335,7 +335,7 @@ PHP_METHOD(Call, startBatch) { 1 TSRMLS_CC); goto cleanup; } - ops[op_num].data.send_message = + ops[op_num].data.send_message.send_message = string_to_byte_buffer(Z_STRVAL_P(message_value), Z_STRLEN_P(message_value)); break; @@ -390,10 +390,11 @@ PHP_METHOD(Call, startBatch) { } break; case GRPC_OP_RECV_INITIAL_METADATA: - ops[op_num].data.recv_initial_metadata = &recv_metadata; + ops[op_num].data.recv_initial_metadata.recv_initial_metadata = + &recv_metadata; break; case GRPC_OP_RECV_MESSAGE: - ops[op_num].data.recv_message = &message; + ops[op_num].data.recv_message.recv_message = &message; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: ops[op_num].data.recv_status_on_client.trailing_metadata = @@ -498,7 +499,7 @@ cleanup: } for (int i = 0; i < op_num; i++) { if (ops[i].op == GRPC_OP_SEND_MESSAGE) { - grpc_byte_buffer_destroy(ops[i].data.send_message); + grpc_byte_buffer_destroy(ops[i].data.send_message.send_message); } if (ops[i].op == GRPC_OP_RECV_MESSAGE) { grpc_byte_buffer_destroy(message); diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 69b837c4db..e07a2f873c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -606,7 +606,7 @@ def operation_send_message(data, int flags): op.c_op.type = GRPC_OP_SEND_MESSAGE op.c_op.flags = flags byte_buffer = ByteBuffer(data) - op.c_op.data.send_message = byte_buffer.c_byte_buffer + op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer op.references.append(byte_buffer) op.is_valid = True return op @@ -639,7 +639,7 @@ def operation_receive_initial_metadata(int flags): op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA op.c_op.flags = flags op._received_metadata = Metadata([]) - op.c_op.data.receive_initial_metadata = ( + op.c_op.data.recv_initial_metadata.receive_initial_metadata = ( &op._received_metadata.c_metadata_array) op.is_valid = True return op @@ -652,7 +652,8 @@ def operation_receive_message(int flags): # n.b. the c_op.data.receive_message field needs to be deleted by us, # anyway, so we just let that be handled by the ByteBuffer() we allocated # the line before. - op.c_op.data.receive_message = &op._received_message.c_byte_buffer + op.c_op.data.recv_message.receive_message = + &op._received_message.c_byte_buffer op.is_valid = True return op diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 67a42af619..fbc68c4e59 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -641,7 +641,7 @@ static void grpc_run_batch_stack_cleanup(run_batch_stack *st) { for (i = 0; i < st->op_num; i++) { if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) { - grpc_byte_buffer_destroy(st->ops[i].data.send_message); + grpc_byte_buffer_destroy(st->ops[i].data.send_message.send_message); } } } @@ -673,8 +673,9 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { st->send_metadata.metadata; break; case GRPC_OP_SEND_MESSAGE: - st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer( - RSTRING_PTR(this_value), RSTRING_LEN(this_value)); + st->ops[st->op_num].data.send_message.send_message = + grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value), + RSTRING_LEN(this_value)); st->ops[st->op_num].flags = st->write_flag; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: @@ -686,10 +687,11 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { &st->ops[st->op_num], &st->send_trailing_metadata, this_value); break; case GRPC_OP_RECV_INITIAL_METADATA: - st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata; + st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata = + &st->recv_metadata; break; case GRPC_OP_RECV_MESSAGE: - st->ops[st->op_num].data.recv_message = &st->recv_message; + st->ops[st->op_num].data.recv_message.recv_message = &st->recv_message; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: st->ops[st->op_num].data.recv_status_on_client.trailing_metadata = -- cgit v1.2.3 From 0a2fae9aedeef98b39c5f825cb28b5ca32ecc6a2 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 1 Feb 2017 14:49:03 -0800 Subject: Dynamically enable packet coalescing by channel args --- BUILD | 1 + build.yaml | 1 + gRPC-Core.podspec | 3 +- include/grpc/impl/codegen/grpc_types.h | 4 + .../cronet/client/secure/cronet_channel_create.c | 13 +- .../transport/cronet/transport/cronet_transport.c | 181 +++++++++++++-------- .../transport/cronet/transport/cronet_transport.h | 43 +++++ .../tests/CronetUnitTests/CronetUnitTests.m | 20 ++- src/objective-c/tests/Podfile | 1 - templates/gRPC-Core.podspec.template | 3 +- tools/run_tests/generated/sources_and_headers.json | 4 +- 11 files changed, 186 insertions(+), 88 deletions(-) create mode 100644 src/core/ext/transport/cronet/transport/cronet_transport.h (limited to 'include/grpc/impl') diff --git a/BUILD b/BUILD index 54192514cc..e5c2d20d25 100644 --- a/BUILD +++ b/BUILD @@ -1028,6 +1028,7 @@ grpc_cc_library( ], hdrs = [ "third_party/Cronet/bidirectional_stream_c.h", + "src/core/ext/transport/cronet/transport/cronet_transport.h", ], language = "c", public_hdrs = [ diff --git a/build.yaml b/build.yaml index 23e2659ea1..63c05b76c6 100644 --- a/build.yaml +++ b/build.yaml @@ -684,6 +684,7 @@ filegroups: - include/grpc/grpc_security.h - include/grpc/grpc_security_constants.h headers: + - src/core/ext/transport/cronet/transport/cronet_transport.h - third_party/Cronet/bidirectional_stream_c.h src: - src/core/ext/transport/cronet/client/secure/cronet_channel_create.c diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 1eb178931d..57816d066d 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -848,7 +848,8 @@ Pod::Spec.new do |s| s.subspec 'Cronet-Interface' do |ss| ss.header_mappings_dir = 'include/grpc' - ss.source_files = 'include/grpc/grpc_cronet.h' + ss.source_files = 'include/grpc/grpc_cronet.h', + 'src/core/ext/transport/cronet/transport/cronet_transport.h' end s.subspec 'Cronet-Implementation' do |ss| diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index ee8101aab8..74be7d0ccf 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -217,6 +217,10 @@ typedef struct { #define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name" /** The grpc_socket_mutator instance that set the socket options. A pointer. */ #define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator" +/** If non-zero, Cronet transport will coalesce packets to fewer frames when + * possible. */ +#define GRPC_ARG_USE_CRONET_PACKET_COALESCING \ + "grpc.use_cronet_packet_coalescing" /** \} */ /** Result of a grpc call. If the caller satisfies the prerequisites of a diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index 477cf07f45..b6e9e845df 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -39,6 +39,7 @@ #include #include +#include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" @@ -54,16 +55,14 @@ extern grpc_transport_vtable grpc_cronet_vtable; GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved) { - cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); - ct->base.vtable = &grpc_cronet_vtable; - ct->engine = engine; - ct->host = gpr_malloc(strlen(target) + 1); - strcpy(ct->host, target); gpr_log(GPR_DEBUG, "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine, - ct->host); + target); + + grpc_transport *ct = + grpc_create_cronet_transport(engine, target, args, reserved); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; return grpc_channel_create(&exec_ctx, target, args, - GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); + GRPC_CLIENT_DIRECT_CHANNEL, ct); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 447f3f31ec..cba5365bad 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -112,6 +112,7 @@ struct grpc_cronet_transport { grpc_transport base; /* must be first element in this structure */ stream_engine *engine; char *host; + bool use_packet_coalescing; }; typedef struct grpc_cronet_transport grpc_cronet_transport; @@ -150,10 +151,8 @@ struct op_state { bool state_callback_received[OP_NUM_OPS]; bool fail_state; bool flush_read; -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING bool flush_cronet_when_ready; bool pending_write_for_trailer; -#endif bool unprocessed_send_message; grpc_error *cancel_error; /* data structure for storing data coming from server */ @@ -178,7 +177,7 @@ struct op_storage { struct stream_obj { struct op_and_state *oas; grpc_transport_stream_op *curr_op; - grpc_cronet_transport curr_ct; + grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; bidirectional_stream_header_array header_array; @@ -415,6 +414,7 @@ static void on_succeeded(bidirectional_stream *stream) { static void on_stream_ready(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; @@ -425,12 +425,12 @@ static void on_stream_ready(bidirectional_stream *stream) { } /* Send the initial metadata on wire if there is no SEND_MESSAGE or * SEND_TRAILING_METADATA ops pending */ -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - if (s->state.flush_cronet_when_ready) { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(stream); + if (t->use_packet_coalescing) { + if (s->state.flush_cronet_when_ready) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(stream); + } } -#endif gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -540,6 +540,7 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); @@ -568,10 +569,10 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(s->cbs); -#endif + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -695,8 +696,10 @@ static bool header_has_authority(grpc_linked_mdelem *head) { executed. This is the heart of the state machine. */ static bool op_can_be_run(grpc_transport_stream_op *curr_op, - struct op_state *stream_state, - struct op_state *op_state, enum e_op_id op_id) { + struct stream_obj *s, struct op_state *op_state, + enum e_op_id op_id) { + struct op_state *stream_state = &s->state; + grpc_cronet_transport *t = s->curr_ct; bool result = true; /* When call is canceled, every op can be run, except under following conditions @@ -768,11 +771,9 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, result = false; /* we haven't got on_write_completed for the send yet */ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && - !stream_state->state_callback_received[OP_SEND_MESSAGE] -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - && !stream_state->pending_write_for_trailer -#endif - ) + !stream_state->state_callback_received[OP_SEND_MESSAGE] && + !(t->use_packet_coalescing && + stream_state->pending_write_for_trailer)) result = false; } else if (op_id == OP_CANCEL_ERROR) { /* already executed */ @@ -845,42 +846,41 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { grpc_transport_stream_op *stream_op = &oas->op; struct stream_obj *s = oas->s; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; enum e_op_result result = NO_ACTION_POSSIBLE; if (stream_op->send_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_INITIAL_METADATA)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled, * on_failed */ GPR_ASSERT(s->cbs == NULL); GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); - s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, + s->cbs = bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - bidirectional_stream_disable_auto_flush(s->cbs, true); - bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); -#endif + if (t->use_packet_coalescing) { + bidirectional_stream_disable_auto_flush(s->cbs, true); + bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); + } char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers( - stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, + stream_op->send_initial_metadata->list.head, t->host, &url, &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - if (!stream_op->send_message && !stream_op->send_trailing_metadata) { - s->state.flush_cronet_when_ready = true; + if (t->use_packet_coalescing) { + if (!stream_op->send_message && !stream_op->send_trailing_metadata) { + s->state.flush_cronet_when_ready = true; + } } -#endif result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->send_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); stream_state->unprocessed_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { @@ -913,19 +913,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - if (!stream_op->send_trailing_metadata) { - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", - s->cbs); - bidirectional_stream_flush(s->cbs); - result = ACTION_TAKEN_WITH_CALLBACK; + if (t->use_packet_coalescing) { + if (!stream_op->send_trailing_metadata) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + stream_state->pending_write_for_trailer = true; + result = ACTION_TAKEN_NO_CALLBACK; + } } else { - stream_state->pending_write_for_trailer = true; - result = ACTION_TAKEN_NO_CALLBACK; + result = ACTION_TAKEN_WITH_CALLBACK; } -#else - result = ACTION_TAKEN_WITH_CALLBACK; -#endif } else { result = NO_ACTION_POSSIBLE; } @@ -933,7 +932,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; } else if (stream_op->send_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_SEND_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); if (stream_state->state_callback_received[OP_FAILED]) { @@ -944,15 +943,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(s->cbs); -#endif + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } result = ACTION_TAKEN_WITH_CALLBACK; } stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; } else if (stream_op->recv_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { @@ -971,8 +970,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->recv_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); @@ -1084,7 +1082,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_RECV_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); if (oas->s->state.rs.trailing_metadata_valid) { @@ -1096,8 +1094,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->cancel_error && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_CANCEL_ERROR)) { + op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { @@ -1111,8 +1108,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); } } else if (stream_op->on_complete && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_ON_COMPLETE)) { + op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { grpc_closure_sched(exec_ctx, stream_op->on_complete, @@ -1176,10 +1172,12 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(s->state.state_callback_received)); s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; -#endif s->state.unprocessed_send_message = false; + + s->curr_gs = gs; + s->curr_ct = (grpc_cronet_transport *)gt; + gpr_mu_init(&s->mu); return 0; } @@ -1195,8 +1193,6 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); stream_obj *s = (stream_obj *)gs; - s->curr_gs = gs; - memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); add_to_storage(s, op); if (op->send_initial_metadata && header_has_authority(op->send_initial_metadata->list.head)) { @@ -1244,14 +1240,55 @@ static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) {} -const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), - "cronet_http", - init_stream, - set_pollset_do_nothing, - set_pollset_set_do_nothing, - perform_stream_op, - perform_op, - destroy_stream, - destroy_transport, - get_peer, - get_endpoint}; +static const grpc_transport_vtable grpc_cronet_vtable = { + sizeof(stream_obj), + "cronet_http", + init_stream, + set_pollset_do_nothing, + set_pollset_set_do_nothing, + perform_stream_op, + perform_op, + destroy_stream, + destroy_transport, + get_peer, + get_endpoint}; + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved) { + grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport)); + if (!ct) { + goto error; + } + ct->base.vtable = &grpc_cronet_vtable; + ct->engine = engine; + ct->host = gpr_malloc(strlen(target) + 1); + if (!ct->host) { + goto error; + } + strcpy(ct->host, target); + + ct->use_packet_coalescing = true; + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_USE_CRONET_PACKET_COALESCING); + } else { + ct->use_packet_coalescing = (args->args[i].value.integer != 0); + } + } + } + + return &ct->base; + +error: + if (ct) { + if (ct->host) { + gpr_free(ct->host); + } + gpr_free(ct); + } + + return NULL; +} \ No newline at end of file diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.h b/src/core/ext/transport/cronet/transport/cronet_transport.h new file mode 100644 index 0000000000..169ce31fd7 --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_transport.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H +#define GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H + +#include "src/core/lib/transport/transport.h" + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H */ diff --git a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m index dcd7f2fa8d..d06fe7767f 100644 --- a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m +++ b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m @@ -269,7 +269,12 @@ unsigned int parse_h2_length(const char *field) { grpc_completion_queue_destroy(cq); } -- (void)testPacketCoalescing { +- (void)PacketCoalescing:(bool)use_coalescing { + grpc_arg arg; + arg.key = GRPC_ARG_USE_CRONET_PACKET_COALESCING; + arg.type = GRPC_ARG_INTEGER; + arg.value.integer = use_coalescing ? 1:0; + grpc_channel_args *args = grpc_channel_args_copy_and_add(NULL, &arg, 1); grpc_call *c; grpc_slice request_payload_slice = grpc_slice_from_copied_string("hello world"); @@ -285,8 +290,8 @@ unsigned int parse_h2_length(const char *field) { gpr_join_host_port(&addr, "127.0.0.1", port); grpc_completion_queue *cq = grpc_completion_queue_create(NULL); stream_engine *cronetEngine = [Cronet getGlobalEngine]; - grpc_channel *client = grpc_cronet_secure_channel_create(cronetEngine, addr, - NULL, NULL); + grpc_channel *client = + grpc_cronet_secure_channel_create(cronetEngine, addr, args, NULL); cq_verifier *cqv = cq_verifier_create(cq); grpc_op ops[6]; @@ -379,7 +384,7 @@ unsigned int parse_h2_length(const char *field) { long len; bool coalesced = false; while ((len = SSL_read(ssl, buf, sizeof(buf))) > 0) { - NSLog(@"Read len: %ld", len); + gpr_log(GPR_DEBUG, "Read len: %ld", len); // Analyze the HTTP/2 frames in the same TLS PDU to identify if // coalescing is successful @@ -404,7 +409,7 @@ unsigned int parse_h2_length(const char *field) { } } - XCTAssert(coalesced); + XCTAssert(coalesced == use_coalescing); SSL_free(ssl); SSL_CTX_free(ctx); close(s); @@ -433,4 +438,9 @@ unsigned int parse_h2_length(const char *field) { grpc_completion_queue_destroy(cq); } +- (void)testPacketCoalescing { + [self PacketCoalescing:true]; + [self PacketCoalescing:false]; +} + @end diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 462c6a8e0e..3760330be9 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -97,7 +97,6 @@ post_install do |installer| # GPR_UNREACHABLE_CODE causes "Control may reach end of non-void # function" warning config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO' - config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1' end end diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template index 1b97d18f16..0738b7221b 100644 --- a/templates/gRPC-Core.podspec.template +++ b/templates/gRPC-Core.podspec.template @@ -161,7 +161,8 @@ s.subspec 'Cronet-Interface' do |ss| ss.header_mappings_dir = 'include/grpc' - ss.source_files = 'include/grpc/grpc_cronet.h' + ss.source_files = 'include/grpc/grpc_cronet.h', + 'src/core/ext/transport/cronet/transport/cronet_transport.h' end s.subspec 'Cronet-Implementation' do |ss| diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 9bc82486d2..7f59411019 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7873,6 +7873,7 @@ "include/grpc/grpc_cronet.h", "include/grpc/grpc_security.h", "include/grpc/grpc_security_constants.h", + "src/core/ext/transport/cronet/transport/cronet_transport.h", "third_party/Cronet/bidirectional_stream_c.h" ], "is_filegroup": true, @@ -7884,7 +7885,8 @@ "include/grpc/grpc_security_constants.h", "src/core/ext/transport/cronet/client/secure/cronet_channel_create.c", "src/core/ext/transport/cronet/transport/cronet_api_dummy.c", - "src/core/ext/transport/cronet/transport/cronet_transport.c" + "src/core/ext/transport/cronet/transport/cronet_transport.c", + "src/core/ext/transport/cronet/transport/cronet_transport.h" ], "third_party": false, "type": "filegroup" -- cgit v1.2.3 From f3dec9c995e5c2b85460107a9ad9d937facd9a49 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Feb 2017 10:02:31 -0800 Subject: Add counters for rmw atomic operations to microbenchmarks --- Makefile | 2 +- build.yaml | 2 +- include/grpc/impl/codegen/atm_gcc_atomic.h | 30 ++++++++++++------ src/core/lib/support/sync_posix.c | 9 +++--- test/cpp/microbenchmarks/bm_closure.cc | 51 ++++++++++++++++++++++++++++++ test/cpp/microbenchmarks/bm_fullstack.cc | 20 ++++++++---- 6 files changed, 92 insertions(+), 22 deletions(-) (limited to 'include/grpc/impl') diff --git a/Makefile b/Makefile index 8f92816672..d47d967e47 100644 --- a/Makefile +++ b/Makefile @@ -217,7 +217,7 @@ CC_counters = $(DEFAULT_CC) CXX_counters = $(DEFAULT_CXX) LD_counters = $(DEFAULT_CC) LDXX_counters = $(DEFAULT_CXX) -CPPFLAGS_counters = -O2 -DGPR_MU_COUNTERS +CPPFLAGS_counters = -O2 -DGPR_LOW_LEVEL_COUNTERS DEFINES_counters = NDEBUG diff --git a/build.yaml b/build.yaml index 141526cb59..120c7a85b1 100644 --- a/build.yaml +++ b/build.yaml @@ -3919,7 +3919,7 @@ configs: CPPFLAGS: -O2 -DGRPC_BASIC_PROFILER -DGRPC_TIMERS_RDTSC DEFINES: NDEBUG counters: - CPPFLAGS: -O2 -DGPR_MU_COUNTERS + CPPFLAGS: -O2 -DGPR_LOW_LEVEL_COUNTERS DEFINES: NDEBUG dbg: CPPFLAGS: -O0 diff --git a/include/grpc/impl/codegen/atm_gcc_atomic.h b/include/grpc/impl/codegen/atm_gcc_atomic.h index 7d4ae98cf7..e5a623f723 100644 --- a/include/grpc/impl/codegen/atm_gcc_atomic.h +++ b/include/grpc/impl/codegen/atm_gcc_atomic.h @@ -40,6 +40,15 @@ typedef intptr_t gpr_atm; +#ifdef GPR_LOW_LEVEL_COUNTERS +extern gpr_atm gpr_counter_rmw; +#define GPR_ATM_INC_COUNTER(counter) \ + __atomic_fetch_add(&counter, 1, __ATOMIC_RELAXED) +#define GPR_ATM_INC_RMW_THEN(blah) (GPR_ATM_INC_COUNTER(gpr_counter_rmw), blah) +#else +#define GPR_ATM_INC_RMW_THEN(blah) blah +#endif + #define gpr_atm_full_barrier() (__atomic_thread_fence(__ATOMIC_SEQ_CST)) #define gpr_atm_acq_load(p) (__atomic_load_n((p), __ATOMIC_ACQUIRE)) @@ -50,25 +59,28 @@ typedef intptr_t gpr_atm; (__atomic_store_n((p), (intptr_t)(value), __ATOMIC_RELAXED)) #define gpr_atm_no_barrier_fetch_add(p, delta) \ - (__atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_RELAXED)) + GPR_ATM_INC_RMW_THEN( \ + __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_RELAXED)) #define gpr_atm_full_fetch_add(p, delta) \ - (__atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_ACQ_REL)) + GPR_ATM_INC_RMW_THEN( \ + __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_ACQ_REL)) static __inline int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n) { - return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_RELAXED, - __ATOMIC_RELAXED); + return GPR_ATM_INC_RMW_THEN(__atomic_compare_exchange_n( + p, &o, n, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); } static __inline int gpr_atm_acq_cas(gpr_atm *p, gpr_atm o, gpr_atm n) { - return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_ACQUIRE, - __ATOMIC_RELAXED); + return GPR_ATM_INC_RMW_THEN(__atomic_compare_exchange_n( + p, &o, n, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); } static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) { - return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_RELEASE, - __ATOMIC_RELAXED); + return GPR_ATM_INC_RMW_THEN(__atomic_compare_exchange_n( + p, &o, n, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); } -#define gpr_atm_full_xchg(p, n) __atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL) +#define gpr_atm_full_xchg(p, n) \ + GPR_ATM_INC_RMW_THEN(__atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL)) #endif /* GRPC_IMPL_CODEGEN_ATM_GCC_ATOMIC_H */ diff --git a/src/core/lib/support/sync_posix.c b/src/core/lib/support/sync_posix.c index de0f0484b5..3b7d780608 100644 --- a/src/core/lib/support/sync_posix.c +++ b/src/core/lib/support/sync_posix.c @@ -42,8 +42,9 @@ #include #include "src/core/lib/profiling/timers.h" -#ifdef GPR_MU_COUNTERS -gpr_atm grpc_mu_locks = 0; +#ifdef GPR_LOW_LEVEL_COUNTERS +gpr_atm gpr_mu_locks = 0; +gpr_atm gpr_counter_rmw = 0; #endif void gpr_mu_init(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_init(mu, NULL) == 0); } @@ -51,8 +52,8 @@ void gpr_mu_init(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_init(mu, NULL) == 0); } void gpr_mu_destroy(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_destroy(mu) == 0); } void gpr_mu_lock(gpr_mu* mu) { -#ifdef GPR_MU_COUNTERS - gpr_atm_no_barrier_fetch_add(&grpc_mu_locks, 1); +#ifdef GPR_LOW_LEVEL_COUNTERS + GPR_ATM_INC_COUNTER(gpr_mu_locks); #endif GPR_TIMER_BEGIN("gpr_mu_lock", 0); GPR_ASSERT(pthread_mutex_lock(mu) == 0); diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 80d6610e13..16d05781bb 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -43,13 +43,46 @@ extern "C" { #include "third_party/benchmark/include/benchmark/benchmark.h" +#include + +#ifdef GPR_LOW_LEVEL_COUNTERS +extern "C" gpr_atm gpr_mu_locks; +#endif + static class InitializeStuff { public: InitializeStuff() { grpc_init(); } ~InitializeStuff() { grpc_shutdown(); } } initialize_stuff; +class TrackCounters { + public: + TrackCounters(benchmark::State& state) : state_(state) {} + + ~TrackCounters() { + std::ostringstream out; +#ifdef GPR_LOW_LEVEL_COUNTERS + out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&gpr_mu_locks) - + mu_locks_at_start_) / + (double)state_.iterations()) + << " atm_rmw/iter:" + << ((double)(gpr_atm_no_barrier_load(&gpr_counter_rmw) - + rmw_at_start_) / + (double)state_.iterations()); +#endif + state_.SetLabel(out.str()); + } + + private: + benchmark::State& state_; +#ifdef GPR_LOW_LEVEL_COUNTERS + const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&gpr_mu_locks); + const size_t rmw_at_start_ = gpr_atm_no_barrier_load(&gpr_counter_rmw); +#endif +}; + static void BM_NoOpExecCtx(benchmark::State& state) { + TrackCounters track_counters(state); while (state.KeepRunning()) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx_finish(&exec_ctx); @@ -58,6 +91,7 @@ static void BM_NoOpExecCtx(benchmark::State& state) { BENCHMARK(BM_NoOpExecCtx); static void BM_WellFlushed(benchmark::State& state) { + TrackCounters track_counters(state); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { grpc_exec_ctx_flush(&exec_ctx); @@ -69,6 +103,7 @@ BENCHMARK(BM_WellFlushed); static void DoNothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} static void BM_ClosureInitAgainstExecCtx(benchmark::State& state) { + TrackCounters track_counters(state); grpc_closure c; while (state.KeepRunning()) { benchmark::DoNotOptimize( @@ -78,6 +113,7 @@ static void BM_ClosureInitAgainstExecCtx(benchmark::State& state) { BENCHMARK(BM_ClosureInitAgainstExecCtx); static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { + TrackCounters track_counters(state); grpc_combiner* combiner = grpc_combiner_create(NULL); grpc_closure c; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -91,6 +127,7 @@ static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureInitAgainstCombiner); static void BM_ClosureRunOnExecCtx(benchmark::State& state) { + TrackCounters track_counters(state); grpc_closure c; grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -103,6 +140,7 @@ static void BM_ClosureRunOnExecCtx(benchmark::State& state) { BENCHMARK(BM_ClosureRunOnExecCtx); static void BM_ClosureCreateAndRun(benchmark::State& state) { + TrackCounters track_counters(state); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { grpc_closure_run(&exec_ctx, grpc_closure_create(DoNothing, NULL, @@ -114,6 +152,7 @@ static void BM_ClosureCreateAndRun(benchmark::State& state) { BENCHMARK(BM_ClosureCreateAndRun); static void BM_ClosureInitAndRun(benchmark::State& state) { + TrackCounters track_counters(state); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure c; while (state.KeepRunning()) { @@ -126,6 +165,7 @@ static void BM_ClosureInitAndRun(benchmark::State& state) { BENCHMARK(BM_ClosureInitAndRun); static void BM_ClosureSchedOnExecCtx(benchmark::State& state) { + TrackCounters track_counters(state); grpc_closure c; grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -138,6 +178,7 @@ static void BM_ClosureSchedOnExecCtx(benchmark::State& state) { BENCHMARK(BM_ClosureSchedOnExecCtx); static void BM_ClosureSched2OnExecCtx(benchmark::State& state) { + TrackCounters track_counters(state); grpc_closure c1; grpc_closure c2; grpc_closure_init(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx); @@ -153,6 +194,7 @@ static void BM_ClosureSched2OnExecCtx(benchmark::State& state) { BENCHMARK(BM_ClosureSched2OnExecCtx); static void BM_ClosureSched3OnExecCtx(benchmark::State& state) { + TrackCounters track_counters(state); grpc_closure c1; grpc_closure c2; grpc_closure c3; @@ -171,6 +213,7 @@ static void BM_ClosureSched3OnExecCtx(benchmark::State& state) { BENCHMARK(BM_ClosureSched3OnExecCtx); static void BM_AcquireMutex(benchmark::State& state) { + TrackCounters track_counters(state); // for comparison with the combiner stuff below gpr_mu mu; gpr_mu_init(&mu); @@ -185,6 +228,7 @@ static void BM_AcquireMutex(benchmark::State& state) { BENCHMARK(BM_AcquireMutex); static void BM_ClosureSchedOnCombiner(benchmark::State& state) { + TrackCounters track_counters(state); grpc_combiner* combiner = grpc_combiner_create(NULL); grpc_closure c; grpc_closure_init(&c, DoNothing, NULL, @@ -200,6 +244,7 @@ static void BM_ClosureSchedOnCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureSchedOnCombiner); static void BM_ClosureSched2OnCombiner(benchmark::State& state) { + TrackCounters track_counters(state); grpc_combiner* combiner = grpc_combiner_create(NULL); grpc_closure c1; grpc_closure c2; @@ -219,6 +264,7 @@ static void BM_ClosureSched2OnCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureSched2OnCombiner); static void BM_ClosureSched3OnCombiner(benchmark::State& state) { + TrackCounters track_counters(state); grpc_combiner* combiner = grpc_combiner_create(NULL); grpc_closure c1; grpc_closure c2; @@ -242,6 +288,7 @@ static void BM_ClosureSched3OnCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureSched3OnCombiner); static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) { + TrackCounters track_counters(state); grpc_combiner* combiner1 = grpc_combiner_create(NULL); grpc_combiner* combiner2 = grpc_combiner_create(NULL); grpc_closure c1; @@ -263,6 +310,7 @@ static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) { BENCHMARK(BM_ClosureSched2OnTwoCombiners); static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) { + TrackCounters track_counters(state); grpc_combiner* combiner1 = grpc_combiner_create(NULL); grpc_combiner* combiner2 = grpc_combiner_create(NULL); grpc_closure c1; @@ -323,6 +371,7 @@ class Rescheduler { }; static void BM_ClosureReschedOnExecCtx(benchmark::State& state) { + TrackCounters track_counters(state); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; Rescheduler r(state, grpc_schedule_on_exec_ctx); r.ScheduleFirst(&exec_ctx); @@ -331,6 +380,7 @@ static void BM_ClosureReschedOnExecCtx(benchmark::State& state) { BENCHMARK(BM_ClosureReschedOnExecCtx); static void BM_ClosureReschedOnCombiner(benchmark::State& state) { + TrackCounters track_counters(state); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_combiner* combiner = grpc_combiner_create(NULL); Rescheduler r(state, grpc_combiner_scheduler(combiner, false)); @@ -342,6 +392,7 @@ static void BM_ClosureReschedOnCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureReschedOnCombiner); static void BM_ClosureReschedOnCombinerFinally(benchmark::State& state) { + TrackCounters track_counters(state); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_combiner* combiner = grpc_combiner_create(NULL); Rescheduler r(state, grpc_combiner_finally_scheduler(combiner, false)); diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc index c63de0ce0a..5bb456ab46 100644 --- a/test/cpp/microbenchmarks/bm_fullstack.cc +++ b/test/cpp/microbenchmarks/bm_fullstack.cc @@ -99,8 +99,9 @@ static void ApplyCommonChannelArguments(ChannelArguments* c) { c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); } -#ifdef GPR_MU_COUNTERS -extern "C" gpr_atm grpc_mu_locks; +#ifdef GPR_LOW_LEVEL_COUNTERS +extern "C" gpr_atm gpr_mu_locks; +extern "C" gpr_atm gpr_counter_rmw; #endif class BaseFixture { @@ -108,10 +109,14 @@ class BaseFixture { void Finish(benchmark::State& s) { std::ostringstream out; this->AddToLabel(out, s); -#ifdef GPR_MU_COUNTERS - out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&grpc_mu_locks) - +#ifdef GPR_LOW_LEVEL_COUNTERS + out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&gpr_mu_locks) - mu_locks_at_start_) / - (double)s.iterations()); + (double)s.iterations()) + << " atm_rmw/iter:" + << ((double)(gpr_atm_no_barrier_load(&gpr_counter_rmw) - + rmw_at_start_) / + (double)s.iterations()); #endif grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot(); out << " allocs/iter:" @@ -128,8 +133,9 @@ class BaseFixture { virtual void AddToLabel(std::ostream& out, benchmark::State& s) = 0; private: -#ifdef GPR_MU_COUNTERS - const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&grpc_mu_locks); +#ifdef GPR_LOW_LEVEL_COUNTERS + const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&gpr_mu_locks); + const size_t rmw_at_start_ = gpr_atm_no_barrier_load(&gpr_counter_rmw); #endif grpc_memory_counters counters_at_start_ = grpc_memory_counters_snapshot(); }; -- cgit v1.2.3 From 7f4d30a0321a042d4f8512815c1029cbaad6fac8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Feb 2017 10:24:00 -0800 Subject: Separate CAS/ADD RMWs --- include/grpc/impl/codegen/atm_gcc_atomic.h | 23 ++++++++++++++--------- src/core/lib/support/sync_posix.c | 3 ++- test/cpp/microbenchmarks/bm_closure.cc | 15 +++++++++++---- test/cpp/microbenchmarks/bm_fullstack.cc | 18 +++++++++++++----- tools/profiling/microbenchmarks/bm2bq.py | 3 ++- 5 files changed, 42 insertions(+), 20 deletions(-) (limited to 'include/grpc/impl') diff --git a/include/grpc/impl/codegen/atm_gcc_atomic.h b/include/grpc/impl/codegen/atm_gcc_atomic.h index e5a623f723..4bd3b25741 100644 --- a/include/grpc/impl/codegen/atm_gcc_atomic.h +++ b/include/grpc/impl/codegen/atm_gcc_atomic.h @@ -41,12 +41,17 @@ typedef intptr_t gpr_atm; #ifdef GPR_LOW_LEVEL_COUNTERS -extern gpr_atm gpr_counter_rmw; +extern gpr_atm gpr_counter_atm_cas; +extern gpr_atm gpr_counter_atm_add; #define GPR_ATM_INC_COUNTER(counter) \ __atomic_fetch_add(&counter, 1, __ATOMIC_RELAXED) -#define GPR_ATM_INC_RMW_THEN(blah) (GPR_ATM_INC_COUNTER(gpr_counter_rmw), blah) +#define GPR_ATM_INC_CAS_THEN(blah) \ + (GPR_ATM_INC_COUNTER(gpr_counter_atm_cas), blah) +#define GPR_ATM_INC_ADD_THEN(blah) \ + (GPR_ATM_INC_COUNTER(gpr_counter_atm_add), blah) #else -#define GPR_ATM_INC_RMW_THEN(blah) blah +#define GPR_ATM_INC_CAS_THEN(blah) blah +#define GPR_ATM_INC_ADD_THEN(blah) blah #endif #define gpr_atm_full_barrier() (__atomic_thread_fence(__ATOMIC_SEQ_CST)) @@ -59,28 +64,28 @@ extern gpr_atm gpr_counter_rmw; (__atomic_store_n((p), (intptr_t)(value), __ATOMIC_RELAXED)) #define gpr_atm_no_barrier_fetch_add(p, delta) \ - GPR_ATM_INC_RMW_THEN( \ + GPR_ATM_INC_ADD_THEN( \ __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_RELAXED)) #define gpr_atm_full_fetch_add(p, delta) \ - GPR_ATM_INC_RMW_THEN( \ + GPR_ATM_INC_ADD_THEN( \ __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_ACQ_REL)) static __inline int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n) { - return GPR_ATM_INC_RMW_THEN(__atomic_compare_exchange_n( + return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n( p, &o, n, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); } static __inline int gpr_atm_acq_cas(gpr_atm *p, gpr_atm o, gpr_atm n) { - return GPR_ATM_INC_RMW_THEN(__atomic_compare_exchange_n( + return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n( p, &o, n, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); } static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) { - return GPR_ATM_INC_RMW_THEN(__atomic_compare_exchange_n( + return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n( p, &o, n, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); } #define gpr_atm_full_xchg(p, n) \ - GPR_ATM_INC_RMW_THEN(__atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL)) + GPR_ATM_INC_CAS_THEN(__atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL)) #endif /* GRPC_IMPL_CODEGEN_ATM_GCC_ATOMIC_H */ diff --git a/src/core/lib/support/sync_posix.c b/src/core/lib/support/sync_posix.c index 3b7d780608..16e7d6e12a 100644 --- a/src/core/lib/support/sync_posix.c +++ b/src/core/lib/support/sync_posix.c @@ -44,7 +44,8 @@ #ifdef GPR_LOW_LEVEL_COUNTERS gpr_atm gpr_mu_locks = 0; -gpr_atm gpr_counter_rmw = 0; +gpr_atm gpr_counter_atm_cas = 0; +gpr_atm gpr_counter_atm_add = 0; #endif void gpr_mu_init(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_init(mu, NULL) == 0); } diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 16d05781bb..03aede35b2 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -65,9 +65,13 @@ class TrackCounters { out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&gpr_mu_locks) - mu_locks_at_start_) / (double)state_.iterations()) - << " atm_rmw/iter:" - << ((double)(gpr_atm_no_barrier_load(&gpr_counter_rmw) - - rmw_at_start_) / + << " atm_cas/iter:" + << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_cas) - + atm_cas_at_start_) / + (double)state_.iterations()) + << " atm_add/iter:" + << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_add) - + atm_add_at_start_) / (double)state_.iterations()); #endif state_.SetLabel(out.str()); @@ -77,7 +81,10 @@ class TrackCounters { benchmark::State& state_; #ifdef GPR_LOW_LEVEL_COUNTERS const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&gpr_mu_locks); - const size_t rmw_at_start_ = gpr_atm_no_barrier_load(&gpr_counter_rmw); + const size_t atm_cas_at_start_ = + gpr_atm_no_barrier_load(&gpr_counter_atm_cas); + const size_t atm_add_at_start_ = + gpr_atm_no_barrier_load(&gpr_counter_atm_add); #endif }; diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc index 5bb456ab46..48e131f1be 100644 --- a/test/cpp/microbenchmarks/bm_fullstack.cc +++ b/test/cpp/microbenchmarks/bm_fullstack.cc @@ -101,7 +101,8 @@ static void ApplyCommonChannelArguments(ChannelArguments* c) { #ifdef GPR_LOW_LEVEL_COUNTERS extern "C" gpr_atm gpr_mu_locks; -extern "C" gpr_atm gpr_counter_rmw; +extern "C" gpr_atm gpr_counter_atm_cas; +extern "C" gpr_atm gpr_counter_atm_add; #endif class BaseFixture { @@ -113,9 +114,13 @@ class BaseFixture { out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&gpr_mu_locks) - mu_locks_at_start_) / (double)s.iterations()) - << " atm_rmw/iter:" - << ((double)(gpr_atm_no_barrier_load(&gpr_counter_rmw) - - rmw_at_start_) / + << " atm_cas/iter:" + << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_cas) - + atm_cas_at_start_) / + (double)s.iterations()) + << " atm_add/iter:" + << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_add) - + atm_add_at_start_) / (double)s.iterations()); #endif grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot(); @@ -135,7 +140,10 @@ class BaseFixture { private: #ifdef GPR_LOW_LEVEL_COUNTERS const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&gpr_mu_locks); - const size_t rmw_at_start_ = gpr_atm_no_barrier_load(&gpr_counter_rmw); + const size_t atm_cas_at_start_ = + gpr_atm_no_barrier_load(&gpr_counter_atm_cas); + const size_t atm_add_at_start_ = + gpr_atm_no_barrier_load(&gpr_counter_atm_add); #endif grpc_memory_counters counters_at_start_ = grpc_memory_counters_snapshot(); }; diff --git a/tools/profiling/microbenchmarks/bm2bq.py b/tools/profiling/microbenchmarks/bm2bq.py index 62a2f699c7..8ead4b4455 100755 --- a/tools/profiling/microbenchmarks/bm2bq.py +++ b/tools/profiling/microbenchmarks/bm2bq.py @@ -66,7 +66,8 @@ columns = [ ('cli_stream_stalls_per_iteration', 'float'), ('svr_transport_stalls_per_iteration', 'float'), ('svr_stream_stalls_per_iteration', 'float'), - ('atm_rmw_per_iteration', 'float') + ('atm_cas_per_iteration', 'float') + ('atm_add_per_iteration', 'float') ] if sys.argv[1] == '--schema': -- cgit v1.2.3