diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/surface/call.c | 94 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 7 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_common.py | 4 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 21 |
4 files changed, 74 insertions, 52 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 0fae3effb6..3c563bcc6f 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -153,8 +153,6 @@ struct grpc_call { bool destroy_called; /** flag indicating that cancellation is inherited */ bool cancellation_is_inherited; - /** bitmask of live batches */ - uint8_t used_batches; /** which ops are in-flight */ bool sent_initial_metadata; bool sending_message; @@ -1012,25 +1010,48 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { return !(flags & invalid_positions); } -static batch_control *allocate_batch_control(grpc_call *call) { - size_t i; - for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { - if ((call->used_batches & (1 << i)) == 0) { - call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i)); - return &call->active_batches[i]; - } +static int batch_slot_for_op(grpc_op_type type) { + switch (type) { + case GRPC_OP_SEND_INITIAL_METADATA: + return 0; + case GRPC_OP_SEND_MESSAGE: + return 1; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + case GRPC_OP_SEND_STATUS_FROM_SERVER: + return 2; + case GRPC_OP_RECV_INITIAL_METADATA: + return 3; + case GRPC_OP_RECV_MESSAGE: + return 4; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + case GRPC_OP_RECV_STATUS_ON_CLIENT: + return 5; + } + GPR_UNREACHABLE_CODE(return 123456789); +} + +static batch_control *allocate_batch_control(grpc_call *call, + const grpc_op *ops, + size_t num_ops) { + int slot = batch_slot_for_op(ops[0].op); + for (size_t i = 1; i < num_ops; i++) { + int op_slot = batch_slot_for_op(ops[i].op); + slot = GPR_MIN(slot, op_slot); + } + batch_control *bctl = &call->active_batches[slot]; + if (bctl->call != NULL) { + return NULL; } - return NULL; + memset(bctl, 0, sizeof(*bctl)); + bctl->call = call; + return bctl; } static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, grpc_cq_completion *storage) { batch_control *bctl = user_data; grpc_call *call = bctl->call; - gpr_mu_lock(&call->mu); - call->used_batches = (uint8_t)( - call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches))); - gpr_mu_unlock(&call->mu); + bctl->call = NULL; GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } @@ -1113,12 +1134,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ + bctl->call = NULL; grpc_closure_run(exec_ctx, bctl->notify_tag, error); - gpr_mu_lock(&call->mu); - bctl->call->used_batches = - (uint8_t)(bctl->call->used_batches & - ~(uint8_t)(1 << (bctl - bctl->call->active_batches))); - gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ @@ -1330,6 +1347,11 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, finish_batch_step(exec_ctx, bctl); } +static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p, + grpc_cq_completion *completion) { + gpr_free(completion); +} + static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, @@ -1344,32 +1366,34 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_metadata compression_md; GPR_TIMER_BEGIN("grpc_call_start_batch", 0); - GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); - /* TODO(ctiller): this feels like it could be made lock-free */ - gpr_mu_lock(&call->mu); - bctl = allocate_batch_control(call); - memset(bctl, 0, sizeof(*bctl)); - bctl->call = call; - bctl->notify_tag = notify_tag; - bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); - - grpc_transport_stream_op *stream_op = &bctl->op; - memset(stream_op, 0, sizeof(*stream_op)); - stream_op->covered_by_poller = true; - if (nops == 0) { - GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); + grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, + free_no_op_completion, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); + } else { + grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE); } - gpr_mu_unlock(&call->mu); - post_batch_completion(exec_ctx, bctl); error = GRPC_CALL_OK; goto done; } + /* TODO(ctiller): this feels like it could be made lock-free */ + bctl = allocate_batch_control(call, ops, nops); + if (bctl == NULL) { + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + bctl->notify_tag = notify_tag; + bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + + gpr_mu_lock(&call->mu); + grpc_transport_stream_op *stream_op = &bctl->op; + memset(stream_op, 0, sizeof(*stream_op)); + stream_op->covered_by_poller = true; + /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { op = &ops[i]; diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 26d93faf75..e602f39aeb 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -43,7 +43,6 @@ _USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) _EMPTY_FLAGS = 0 _INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) -_EMPTY_METADATA = cygrpc.Metadata(()) _UNARY_UNARY_INITIAL_DUE = (cygrpc.OperationType.send_initial_metadata, cygrpc.OperationType.send_message, @@ -138,8 +137,8 @@ def _abort(state, code, details): state.code = code state.details = details if state.initial_metadata is None: - state.initial_metadata = _EMPTY_METADATA - state.trailing_metadata = _EMPTY_METADATA + state.initial_metadata = _common.EMPTY_METADATA + state.trailing_metadata = _common.EMPTY_METADATA def _handle_event(event, state, response_deserializer): @@ -435,7 +434,7 @@ def _start_unary_request(request, timeout, request_serializer): deadline, deadline_timespec = _deadline(timeout) serialized_request = _common.serialize(request, request_serializer) if serialized_request is None: - state = _RPCState((), _EMPTY_METADATA, _EMPTY_METADATA, + state = _RPCState((), _common.EMPTY_METADATA, _common.EMPTY_METADATA, grpc.StatusCode.INTERNAL, 'Exception serializing request!') rendezvous = _Rendezvous(state, None, None, deadline) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index f9accd75a9..6879e1780b 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -37,7 +37,7 @@ import six import grpc from grpc._cython import cygrpc -_EMPTY_METADATA = cygrpc.Metadata(()) +EMPTY_METADATA = cygrpc.Metadata(()) CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { cygrpc.ConnectivityState.idle: @@ -107,7 +107,7 @@ def channel_args(options): def cygrpc_metadata(application_metadata): - return _EMPTY_METADATA if application_metadata is None else cygrpc.Metadata( + return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata( cygrpc.Metadatum(encode(key), encode(value)) for key, value in application_metadata) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index bf2743c16b..b8e7ea17f7 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -57,7 +57,6 @@ _CLOSED = 'closed' _CANCELLED = 'cancelled' _EMPTY_FLAGS = 0 -_EMPTY_METADATA = cygrpc.Metadata(()) _UNEXPECTED_EXIT_SERVER_GRACE = 1.0 @@ -143,7 +142,7 @@ def _abort(state, call, code, details): effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _EMPTY_METADATA, _EMPTY_FLAGS), + _common.EMPTY_METADATA, _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( _common.cygrpc_metadata(state.trailing_metadata), effective_code, effective_details, _EMPTY_FLAGS),) @@ -416,7 +415,7 @@ def _send_response(rpc_event, state, serialized_response): else: if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _EMPTY_METADATA, _EMPTY_FLAGS), + _common.EMPTY_METADATA, _EMPTY_FLAGS), cygrpc.operation_send_message(serialized_response, _EMPTY_FLAGS),) state.initial_metadata_allowed = False @@ -446,8 +445,8 @@ def _status(rpc_event, state, serialized_response): ] if state.initial_metadata_allowed: operations.append( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS)) + cygrpc.operation_send_initial_metadata( + _common.EMPTY_METADATA, _EMPTY_FLAGS)) if serialized_response is not None: operations.append( cygrpc.operation_send_message(serialized_response, @@ -549,12 +548,12 @@ def _find_method_handler(rpc_event, generic_handlers): def _handle_unrecognized_method(rpc_event): - operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( - _EMPTY_METADATA, cygrpc.StatusCode.unimplemented, - b'Method not found!', _EMPTY_FLAGS),) + operations = (cygrpc.operation_send_initial_metadata(_common.EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _common.EMPTY_METADATA, cygrpc.StatusCode.unimplemented, + b'Method not found!', _EMPTY_FLAGS),) rpc_state = _RPCState() rpc_event.operation_call.start_server_batch( operations, lambda ignored_event: (rpc_state, (),)) |