diff options
author | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2016-03-30 17:05:43 -0700 |
---|---|---|
committer | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2016-03-30 17:05:43 -0700 |
commit | a5dc80d22ae03f76525f030fb1bfd47c0d8266a2 (patch) | |
tree | e35f0890852379cde5b5464c564aa4fcbb6f4939 /src/core/lib/surface | |
parent | 90da73714eac582686736c48d113bdfc75b15169 (diff) | |
parent | c2011586fae18c7b247686428a2469198589a1f9 (diff) |
Merge pull request #5691 from ctiller/idempotent
Idempotent request support for core
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 15 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 30 |
2 files changed, 40 insertions, 5 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 0219d7e6d0..77ad410b50 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -912,7 +912,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(int *)dest = (status != GRPC_STATUS_OK); } -static int are_write_flags_valid(uint32_t flags) { +static bool are_write_flags_valid(uint32_t flags) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ const uint32_t allowed_write_positions = (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); @@ -920,6 +920,15 @@ static int are_write_flags_valid(uint32_t flags) { return !(flags & invalid_positions); } +static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { + /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ + uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK; + if (!is_client) { + invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; + } + return !(flags & invalid_positions); +} + static batch_control *allocate_batch_control(grpc_call *call) { size_t i; for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { @@ -1199,7 +1208,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ - if (op->flags != 0) { + if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } @@ -1223,6 +1232,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->metadata_batch[0][0].deadline = call->send_deadline; stream_op.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; + stream_op.send_idempotent_request = + (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 080734e9d5..55e5e49e11 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -100,6 +100,7 @@ typedef struct requested_call { typedef struct channel_registered_method { registered_method *server_registered_method; + uint32_t flags; grpc_mdstr *method; grpc_mdstr *host; } channel_registered_method; @@ -152,6 +153,7 @@ struct call_data { grpc_completion_queue *cq_new; grpc_metadata_batch *recv_initial_metadata; + bool recv_idempotent_request; grpc_metadata_array initial_metadata; grpc_closure got_initial_metadata; @@ -171,6 +173,7 @@ struct request_matcher { struct registered_method { char *method; char *host; + uint32_t flags; request_matcher request_matcher; registered_method *next; }; @@ -468,6 +471,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; + if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && + !calld->recv_idempotent_request) + continue; finish_start_new_rpc(exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; @@ -480,6 +486,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; + if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && + !calld->recv_idempotent_request) + continue; finish_start_new_rpc(exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; @@ -598,9 +607,11 @@ static void server_mutate_op(grpc_call_element *elem, call_data *calld = elem->call_data; if (op->recv_initial_metadata != NULL) { + GPR_ASSERT(op->recv_idempotent_request == NULL); calld->recv_initial_metadata = op->recv_initial_metadata; calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; + op->recv_idempotent_request = &calld->recv_idempotent_request; } } @@ -830,10 +841,12 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host) { + const char *host, uint32_t flags) { registered_method *m; - GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)", - 3, (server, method, host)); + GRPC_API_TRACE( + "grpc_server_register_method(server=%p, method=%s, host=%s, " + "flags=0x%08x)", + 4, (server, method, host, flags)); if (!method) { gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); @@ -846,12 +859,18 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } + if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) { + gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x", + flags); + return NULL; + } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); request_matcher_init(&m->request_matcher, server->max_requested_calls); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; + m->flags = flags; server->registered_methods = m; return m; } @@ -930,6 +949,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, if (probes > max_probes) max_probes = probes; crm = &chand->registered_methods[(hash + probes) % slots]; crm->server_registered_method = rm; + crm->flags = rm->flags; crm->host = host; crm->method = method; } @@ -1247,6 +1267,10 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; + rc->data.batch.details->flags = + 0 | (calld->recv_idempotent_request + ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST + : 0); break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; |