diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/surface/call.c | 41 | ||||
-rw-r--r-- | src/core/surface/call.h | 1 | ||||
-rw-r--r-- | src/core/surface/server.c | 3 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 9 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 8 | ||||
-rw-r--r-- | src/csharp/ext/grpc_csharp_ext.c | 24 | ||||
-rw-r--r-- | src/node/ext/call.cc | 1 | ||||
-rw-r--r-- | src/php/ext/grpc/call.c | 1 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_c/utility.c | 1 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 1 |
10 files changed, 88 insertions, 2 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index cead5e08dc..5cdd7cd0f6 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -185,6 +185,7 @@ struct grpc_call { and a strong upper bound of a count of masters to be calculated. */ gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; + gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT]; reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; /* Dynamic array of ioreq's that have completed: the count of @@ -228,6 +229,7 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; + gpr_uint32 incoming_message_flags; grpc_iomgr_closure destroy_closure; }; @@ -670,6 +672,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { } else if (msg.length > 0) { call->reading_message = 1; call->incoming_message_length = msg.length; + call->incoming_message_flags = msg.flags; return 1; } else { finish_message(call); @@ -818,6 +821,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; + gpr_uint32 flags; grpc_metadata_batch mdb; size_t i; GPR_ASSERT(op->send_ops == NULL); @@ -844,8 +848,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; grpc_sopb_add_begin_message( - &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + &call->send_ops, grpc_byte_buffer_length(data.send_message), flags); copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; @@ -979,6 +984,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, have_ops |= 1u << op; call->request_data[op] = data; + call->request_flags[op] = reqs[i].flags; call->request_set[op] = set; } @@ -1189,6 +1195,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } +static int are_write_flags_valid(gpr_uint32 flags) { + /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ + const gpr_uint32 allowed_write_positions = + (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); + const gpr_uint32 invalid_positions = ~allowed_write_positions; + return !(flags & invalid_positions); +} + grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag) { grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; @@ -1211,30 +1225,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op = &ops[in]; switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; req->data.send_metadata.count = op->data.send_initial_metadata.count; req->data.send_metadata.metadata = op->data.send_initial_metadata.metadata; + req->flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: + if (!are_write_flags_valid(op->flags)){ + return GRPC_CALL_ERROR_INVALID_FLAGS; + } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; + req->flags = ops->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; + req->flags = op->flags; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (call->is_client) { return GRPC_CALL_ERROR_NOT_ON_CLIENT; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; + req->flags = op->flags; req->data.send_metadata.count = op->data.send_status_from_server.trailing_metadata_count; req->data.send_metadata.metadata = @@ -1248,24 +1275,33 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_SEND_CLOSE; break; case GRPC_OP_RECV_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; + req->flags = op->flags; break; case GRPC_OP_RECV_MESSAGE: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_MESSAGE; req->data.recv_message = op->data.recv_message; + req->flags = op->flags; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_status_value_directly; req->data.recv_status.user_data = op->data.recv_status_on_client.status; req = &reqs[out++]; @@ -1283,8 +1319,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_cancelled_value; req->data.recv_status.user_data = op->data.recv_close_on_server.cancelled; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 9116538948..7a14161253 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -79,6 +79,7 @@ typedef union { typedef struct { grpc_ioreq_op op; grpc_ioreq_data data; + gpr_uint32 flags; /**< A copy of the write flags from grpc_op */ } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 733f0e8a11..73995202a4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -1111,6 +1111,7 @@ static void begin_call(grpc_server *server, call_data *calld, rc->data.batch.details->deadline = calld->deadline; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; + r->flags = 0; r++; publish = publish_registered_or_batch; break; @@ -1118,10 +1119,12 @@ static void begin_call(grpc_server *server, call_data *calld, *rc->data.registered.deadline = calld->deadline; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; + r->flags = 0; r++; if (rc->data.registered.optional_payload) { r->op = GRPC_IOREQ_RECV_MESSAGE; r->data.recv_message = rc->data.registered.optional_payload; + r->flags = 0; r++; } publish = publish_registered_or_batch; diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 5215cc87b1..e080701e2d 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -58,11 +58,18 @@ typedef enum grpc_stream_op_code { GRPC_OP_SLICE } grpc_stream_op_code; +/** Internal bit flag for grpc_begin_message's \a flags signaling the use of + * compression for the message */ +#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) +/** Mask of all valid internal flags. */ +#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) + /* Arguments for GRPC_OP_BEGIN_MESSAGE */ typedef struct grpc_begin_message { /* How many bytes of data will this message contain */ gpr_uint32 length; - /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */ + /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits, + * GRPC_WRITE_INTERNAL_* for the internal ones. */ gpr_uint32 flags; } grpc_begin_message; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 1068111e3f..3212f770f4 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -224,11 +224,13 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; + ops[*nops].flags = 0; (*nops)++; } if (recv_initial_metadata_) { ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA; ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_; + ops[*nops].flags = 0; (*nops)++; } if (send_message_ || send_message_buffer_) { @@ -245,15 +247,18 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { } ops[*nops].op = GRPC_OP_SEND_MESSAGE; ops[*nops].data.send_message = send_buf_; + ops[*nops].flags = 0; (*nops)++; } if (recv_message_ || recv_message_buffer_) { ops[*nops].op = GRPC_OP_RECV_MESSAGE; ops[*nops].data.recv_message = &recv_buf_; + ops[*nops].flags = 0; (*nops)++; } if (client_send_close_) { ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[*nops].flags = 0; (*nops)++; } if (recv_status_) { @@ -264,6 +269,7 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { ops[*nops].data.recv_status_on_client.status_details = &status_details_; ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; + ops[*nops].flags = 0; (*nops)++; } if (send_status_available_) { @@ -275,11 +281,13 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { ops[*nops].data.send_status_from_server.status = send_status_code_; ops[*nops].data.send_status_from_server.status_details = send_status_details_.empty() ? nullptr : send_status_details_.c_str(); + ops[*nops].flags = 0; (*nops)++; } if (recv_closed_) { ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_; + ops[*nops].flags = 0; (*nops)++; } } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 8337218255..f01c6c9183 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -417,18 +417,23 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[1].data.send_message = ctx->send_message; + ops[1].flags = 0; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[3].flags = 0; ops[4].op = GRPC_OP_RECV_MESSAGE; ops[4].data.recv_message = &(ctx->recv_message); + ops[4].flags = 0; ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[5].data.recv_status_on_client.trailing_metadata = @@ -440,6 +445,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, &(ctx->recv_status_on_client.status_details); ops[5].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[5].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -456,12 +462,15 @@ grpcsharp_call_start_client_streaming(grpc_call *call, ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[1].flags = 0; ops[2].op = GRPC_OP_RECV_MESSAGE; ops[2].data.recv_message = &(ctx->recv_message); + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[3].data.recv_status_on_client.trailing_metadata = @@ -473,6 +482,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call, &(ctx->recv_status_on_client.status_details); ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[3].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -488,15 +498,19 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[1].data.send_message = ctx->send_message; + ops[1].flags = 0; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[3].flags = 0; ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[4].data.recv_status_on_client.trailing_metadata = @@ -508,6 +522,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( &(ctx->recv_status_on_client.status_details); ops[4].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[4].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -524,9 +539,11 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[1].flags = 0; ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[2].data.recv_status_on_client.trailing_metadata = @@ -538,6 +555,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, &(ctx->recv_status_on_client.status_details); ops[2].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[2].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -550,6 +568,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, ops[0].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[0].data.send_message = ctx->send_message; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -560,6 +579,7 @@ grpcsharp_call_send_close_from_client(grpc_call *call, /* TODO: don't use magic number */ grpc_op ops[1]; ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -577,6 +597,7 @@ grpcsharp_call_send_status_from_server(grpc_call *call, gpr_strdup(status_details); ops[0].data.send_status_from_server.trailing_metadata = NULL; ops[0].data.send_status_from_server.trailing_metadata_count = 0; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -587,6 +608,7 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) { grpc_op ops[1]; ops[0].op = GRPC_OP_RECV_MESSAGE; ops[0].data.recv_message = &(ctx->recv_message); + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -597,10 +619,12 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) { ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].data.send_initial_metadata.count = 0; ops[0].data.send_initial_metadata.metadata = NULL; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops[1].data.recv_close_on_server.cancelled = (&ctx->recv_close_on_server_cancelled); + ops[1].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 8cc3e38cd9..15c9b2d97d 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -550,6 +550,7 @@ NAN_METHOD(Call::StartBatch) { } uint32_t type = keys->Get(i)->Uint32Value(); ops[i].op = static_cast<grpc_op_type>(type); + ops[i].flags = 0; switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: op.reset(new SendMetadataOp()); diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 9f651ff56f..10a4946ea6 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -397,6 +397,7 @@ PHP_METHOD(Call, startBatch) { goto cleanup; } ops[op_num].op = (grpc_op_type)index; + ops[op_num].flags = 0; op_num++; } error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped); diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c index e3139f2887..64b3428f2b 100644 --- a/src/python/src/grpc/_adapter/_c/utility.c +++ b/src/python/src/grpc/_adapter/_c/utility.c @@ -168,6 +168,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) { return 0; } c_op.op = type; + c_op.flags = 0; switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: if (!pygrpc_cast_pylist_to_send_metadata( diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 29f870f929..33bfd006da 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -507,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { NUM2INT(this_op)); }; st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op); + st->ops[st->op_num].flags = 0; st->op_num++; } } |