diff options
author | David Garcia Quintas <dgq@google.com> | 2015-06-14 14:42:04 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-06-14 14:42:04 -0700 |
commit | 1d5aca5de05d16968f0f1e208a9aa10d7c06caac (patch) | |
tree | 2137a78d046352a7c4613c7f8710331b000a9c05 /src/core | |
parent | 1b92d65bff76d2c2f58dd555b986d6147db0c39e (diff) |
Added flags support to grpc_op.
Which includes its propagation to grpc_ioreq and validation mechanisms for checking that only known bits are set ot in).
Also added an internal flag (GRPC_WRITE_INTERNAL_COMPRESS) mask for its use in signaling compressed messages.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/surface/call.c | 41 | ||||
-rw-r--r-- | src/core/surface/call.h | 1 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 9 |
3 files changed, 49 insertions, 2 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index cead5e08dc..4b0a4c82f0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -185,6 +185,7 @@ struct grpc_call { and a strong upper bound of a count of masters to be calculated. */ gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; + gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT]; reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; /* Dynamic array of ioreq's that have completed: the count of @@ -228,6 +229,7 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; + gpr_uint32 incoming_message_flags; grpc_iomgr_closure destroy_closure; }; @@ -670,6 +672,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { } else if (msg.length > 0) { call->reading_message = 1; call->incoming_message_length = msg.length; + call->incoming_message_flags = msg.flags; return 1; } else { finish_message(call); @@ -818,6 +821,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; + gpr_uint32 flags; grpc_metadata_batch mdb; size_t i; GPR_ASSERT(op->send_ops == NULL); @@ -844,8 +848,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; grpc_sopb_add_begin_message( - &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + &call->send_ops, grpc_byte_buffer_length(data.send_message), flags); copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; @@ -979,6 +984,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, have_ops |= 1u << op; call->request_data[op] = data; + call->request_flags[op] = reqs[i].flags; call->request_set[op] = set; } @@ -1189,6 +1195,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } +static int are_write_flags_valid(gpr_uint32 flags) { + /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ + const gpr_uint32 allowed_write_positions = + (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); + const gpr_uint32 invalid_positions = ~allowed_write_positions; + return !(flags & invalid_positions); +} + grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag) { grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; @@ -1211,30 +1225,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op = &ops[in]; switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; req->data.send_metadata.count = op->data.send_initial_metadata.count; req->data.send_metadata.metadata = op->data.send_initial_metadata.metadata; + req->flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: + if (!are_write_flags_valid(op->flags)){ + return GRPC_CALL_ERROR_INVALID_FLAGS; + } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; + req->flags = ops->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; + req->flags = op->flags; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags) return GRPC_CALL_ERROR_INVALID_FLAGS; if (call->is_client) { return GRPC_CALL_ERROR_NOT_ON_CLIENT; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; + req->flags = op->flags; req->data.send_metadata.count = op->data.send_status_from_server.trailing_metadata_count; req->data.send_metadata.metadata = @@ -1248,24 +1275,33 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_SEND_CLOSE; break; case GRPC_OP_RECV_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; + req->flags = op->flags; break; case GRPC_OP_RECV_MESSAGE: + /* Flag validation: currently allow no flags */ + if (op->flags) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_MESSAGE; req->data.recv_message = op->data.recv_message; + req->flags = op->flags; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_status_value_directly; req->data.recv_status.user_data = op->data.recv_status_on_client.status; req = &reqs[out++]; @@ -1283,8 +1319,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_cancelled_value; req->data.recv_status.user_data = op->data.recv_close_on_server.cancelled; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 9116538948..7a14161253 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -79,6 +79,7 @@ typedef union { typedef struct { grpc_ioreq_op op; grpc_ioreq_data data; + gpr_uint32 flags; /**< A copy of the write flags from grpc_op */ } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 5215cc87b1..e080701e2d 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -58,11 +58,18 @@ typedef enum grpc_stream_op_code { GRPC_OP_SLICE } grpc_stream_op_code; +/** Internal bit flag for grpc_begin_message's \a flags signaling the use of + * compression for the message */ +#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) +/** Mask of all valid internal flags. */ +#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) + /* Arguments for GRPC_OP_BEGIN_MESSAGE */ typedef struct grpc_begin_message { /* How many bytes of data will this message contain */ gpr_uint32 length; - /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */ + /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits, + * GRPC_WRITE_INTERNAL_* for the internal ones. */ gpr_uint32 flags; } grpc_begin_message; |