aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-04-04 08:10:47 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-04-04 08:10:47 -0700
commit06cb1a9d938b566f79e59e5e9ba31979e13274ba (patch)
tree4033dcf85a479bd7004c3eb852bf9d0187b756e9
parent307a7207a00b1ea34f6f5edbfd3e46faf7222aaf (diff)
Initial interface rework to allow knowing whether to pull payload at registration, not at request time
-rw-r--r--include/grpc/grpc.h15
-rw-r--r--include/grpc/impl/codegen/grpc_types.h4
-rw-r--r--src/core/lib/surface/server.c139
-rw-r--r--src/cpp/server/server.cc18
-rw-r--r--test/core/bad_client/bad_client.c6
-rw-r--r--test/core/surface/server_test.c20
6 files changed, 121 insertions, 81 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 8b460722e2..5c868aece3 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -289,6 +289,14 @@ GRPCAPI grpc_call_error grpc_server_request_call(
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag_new);
+/** How to handle payloads for a registered method */
+typedef enum {
+ /** Don't try to read the payload */
+ GRPC_SRM_PAYLOAD_NONE,
+ /** Read the initial payload as a byte buffer */
+ GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER
+} grpc_server_register_method_payload_handling;
+
/** Registers a method in the server.
Methods to this (host, method) pair will not be reported by
grpc_server_request_call, but instead be reported by
@@ -296,9 +304,10 @@ GRPCAPI grpc_call_error grpc_server_request_call(
registered_method (as returned by this function).
Must be called before grpc_server_start.
Returns NULL on failure. */
-GRPCAPI void *grpc_server_register_method(grpc_server *server,
- const char *method, const char *host,
- uint32_t flags);
+GRPCAPI void *grpc_server_register_method(
+ grpc_server *server, const char *method, const char *host,
+ grpc_server_register_method_payload_handling payload_handling,
+ uint32_t flags);
/** Request notification of a new pre-registered call. 'cq_for_notification'
must have been registered to the server via
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index b09b1cdf44..38da99b4d7 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -185,7 +185,9 @@ typedef enum grpc_call_error {
server */
GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE,
/** this batch of operations leads to more operations than allowed */
- GRPC_CALL_ERROR_BATCH_TOO_BIG
+ GRPC_CALL_ERROR_BATCH_TOO_BIG,
+ /** payload type requested is not the type registered */
+ GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH
} grpc_call_error;
/* Write Flags: */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 1898bee1c1..ac5a2b4160 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -173,6 +173,7 @@ struct request_matcher {
struct registered_method {
char *method;
char *host;
+ grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
request_matcher request_matcher;
registered_method *next;
@@ -417,6 +418,69 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
&op);
}
+static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
+ void *user_data, bool success);
+
+static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
+ gpr_slice slice = value->slice;
+ size_t len = GPR_SLICE_LENGTH(slice);
+
+ if (len + 1 > *capacity) {
+ *capacity = GPR_MAX(len + 1, *capacity * 2);
+ *dest = gpr_realloc(*dest, *capacity);
+ }
+ memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
+}
+
+static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
+ call_data *calld, requested_call *rc) {
+ grpc_op ops[1];
+ grpc_op *op = ops;
+
+ memset(ops, 0, sizeof(ops));
+
+ /* called once initial metadata has been read by the call, but BEFORE
+ the ioreq to fetch it out of the call has been executed.
+ This means metadata related fields can be relied on in calld, but to
+ fill in the metadata array passed by the client, we need to perform
+ an ioreq op, that should complete immediately. */
+
+ grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
+ grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
+ *rc->call = calld->call;
+ calld->cq_new = rc->cq_for_notification;
+ GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
+ switch (rc->type) {
+ case BATCH_CALL:
+ GPR_ASSERT(calld->host != NULL);
+ GPR_ASSERT(calld->path != NULL);
+ cpstr(&rc->data.batch.details->host,
+ &rc->data.batch.details->host_capacity, calld->host);
+ cpstr(&rc->data.batch.details->method,
+ &rc->data.batch.details->method_capacity, calld->path);
+ rc->data.batch.details->deadline = calld->deadline;
+ rc->data.batch.details->flags =
+ 0 | (calld->recv_idempotent_request
+ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ : 0);
+ break;
+ case REGISTERED_CALL:
+ *rc->data.registered.deadline = calld->deadline;
+ if (rc->data.registered.optional_payload) {
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = rc->data.registered.optional_payload;
+ op++;
+ }
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+
+ GRPC_CALL_INTERNAL_REF(calld->call, "server");
+ grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
+ (size_t)(op - ops), &rc->publish);
+}
+
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_call_element *elem, request_matcher *rm) {
call_data *calld = elem->call_data;
@@ -840,8 +904,10 @@ static int streq(const char *a, const char *b) {
return 0 == strcmp(a, b);
}
-void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host, uint32_t flags) {
+void *grpc_server_register_method(
+ grpc_server *server, const char *method, const char *host,
+ grpc_server_register_method_payload_handling payload_handling,
+ uint32_t flags) {
registered_method *m;
GRPC_API_TRACE(
"grpc_server_register_method(server=%p, method=%s, host=%s, "
@@ -1209,6 +1275,12 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
+ if ((optional_payload == NULL) !=
+ (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
+ goto done;
+ }
grpc_cq_begin_op(cq_for_notification, tag);
rc->type = REGISTERED_CALL;
rc->server = server;
@@ -1226,69 +1298,6 @@ done:
return error;
}
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
- void *user_data, bool success);
-
-static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
- gpr_slice slice = value->slice;
- size_t len = GPR_SLICE_LENGTH(slice);
-
- if (len + 1 > *capacity) {
- *capacity = GPR_MAX(len + 1, *capacity * 2);
- *dest = gpr_realloc(*dest, *capacity);
- }
- memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
-}
-
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc) {
- grpc_op ops[1];
- grpc_op *op = ops;
-
- memset(ops, 0, sizeof(ops));
-
- /* called once initial metadata has been read by the call, but BEFORE
- the ioreq to fetch it out of the call has been executed.
- This means metadata related fields can be relied on in calld, but to
- fill in the metadata array passed by the client, we need to perform
- an ioreq op, that should complete immediately. */
-
- grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
- grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
- *rc->call = calld->call;
- calld->cq_new = rc->cq_for_notification;
- GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
- switch (rc->type) {
- case BATCH_CALL:
- GPR_ASSERT(calld->host != NULL);
- GPR_ASSERT(calld->path != NULL);
- cpstr(&rc->data.batch.details->host,
- &rc->data.batch.details->host_capacity, calld->host);
- cpstr(&rc->data.batch.details->method,
- &rc->data.batch.details->method_capacity, calld->path);
- rc->data.batch.details->deadline = calld->deadline;
- rc->data.batch.details->flags =
- 0 | (calld->recv_idempotent_request
- ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
- : 0);
- break;
- case REGISTERED_CALL:
- *rc->data.registered.deadline = calld->deadline;
- if (rc->data.registered.optional_payload) {
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message = rc->data.registered.optional_payload;
- op++;
- }
- break;
- default:
- GPR_UNREACHABLE_CODE(return );
- }
-
- GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
- (size_t)(op - ops), &rc->publish);
-}
-
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
requested_call *rc = req;
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 3e44c502fc..fafe31e84c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -321,6 +321,19 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
g_callbacks.reset(callbacks);
}
+static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
+ RpcServiceMethod* method) {
+ switch (method->method_type()) {
+ case RpcMethod::NORMAL_RPC:
+ case RpcMethod::SERVER_STREAMING:
+ return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
+ case RpcMethod::CLIENT_STREAMING:
+ case RpcMethod::BIDI_STREAMING:
+ return GRPC_SRM_PAYLOAD_NONE;
+ }
+ GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
+}
+
bool Server::RegisterService(const grpc::string* host, Service* service) {
bool has_async_methods = service->has_async_methods();
if (has_async_methods) {
@@ -334,8 +347,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
continue;
}
RpcServiceMethod* method = it->get();
- void* tag = grpc_server_register_method(server_, method->name(),
- host ? host->c_str() : nullptr, 0);
+ void* tag = grpc_server_register_method(
+ server_, method->name(), host ? host->c_str() : nullptr,
+ PayloadHandlingForMethod(method), 0);
if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 433ecf69df..1e1b227159 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -107,9 +107,9 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
gpr_event_init(&a.done_write);
a.validator = validator;
grpc_server_register_completion_queue(a.server, a.cq, NULL);
- a.registered_method =
- grpc_server_register_method(a.server, GRPC_BAD_CLIENT_REGISTERED_METHOD,
- GRPC_BAD_CLIENT_REGISTERED_HOST, 0);
+ a.registered_method = grpc_server_register_method(
+ a.server, GRPC_BAD_CLIENT_REGISTERED_METHOD,
+ GRPC_BAD_CLIENT_REGISTERED_HOST, GRPC_SRM_PAYLOAD_NONE, 0);
grpc_server_start(a.server);
transport = grpc_create_chttp2_transport(&exec_ctx, NULL, sfd.server, 0);
server_setup_transport(&a, transport);
diff --git a/test/core/surface/server_test.c b/test/core/surface/server_test.c
index 4c62d8caad..7bb45434f4 100644
--- a/test/core/surface/server_test.c
+++ b/test/core/surface/server_test.c
@@ -42,19 +42,25 @@ void test_register_method_fail(void) {
grpc_server *server = grpc_server_create(NULL, NULL);
void *method;
void *method_old;
- method = grpc_server_register_method(server, NULL, NULL, 0);
+ method =
+ grpc_server_register_method(server, NULL, NULL, GRPC_SRM_PAYLOAD_NONE, 0);
GPR_ASSERT(method == NULL);
- method_old = grpc_server_register_method(server, "m", "h", 0);
+ method_old =
+ grpc_server_register_method(server, "m", "h", GRPC_SRM_PAYLOAD_NONE, 0);
GPR_ASSERT(method_old != NULL);
- method = grpc_server_register_method(server, "m", "h", 0);
+ method = grpc_server_register_method(
+ server, "m", "h", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
GPR_ASSERT(method == NULL);
- method_old = grpc_server_register_method(
- server, "m2", "h2", GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
+ method_old =
+ grpc_server_register_method(server, "m2", "h2", GRPC_SRM_PAYLOAD_NONE,
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
GPR_ASSERT(method_old != NULL);
- method = grpc_server_register_method(server, "m2", "h2", 0);
+ method =
+ grpc_server_register_method(server, "m2", "h2", GRPC_SRM_PAYLOAD_NONE, 0);
GPR_ASSERT(method == NULL);
method = grpc_server_register_method(
- server, "m2", "h2", GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
+ server, "m2", "h2", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
GPR_ASSERT(method == NULL);
grpc_server_destroy(server);
}