aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c20
-rw-r--r--src/core/lib/surface/init.c12
-rw-r--r--src/core/lib/surface/server.c30
3 files changed, 51 insertions, 11 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index d63a4a7401..77ad410b50 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -174,6 +174,9 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
+ /* Call stats: only valid after trailing metadata received */
+ grpc_transport_stream_stats stats;
+
/* Compression algorithm for the call */
grpc_compression_algorithm compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
@@ -909,7 +912,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
*(int *)dest = (status != GRPC_STATUS_OK);
}
-static int are_write_flags_valid(uint32_t flags) {
+static bool are_write_flags_valid(uint32_t flags) {
/* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
const uint32_t allowed_write_positions =
(GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
@@ -917,6 +920,15 @@ static int are_write_flags_valid(uint32_t flags) {
return !(flags & invalid_positions);
}
+static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
+ /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
+ uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
+ if (!is_client) {
+ invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ }
+ return !(flags & invalid_positions);
+}
+
static batch_control *allocate_batch_control(grpc_call *call) {
size_t i;
for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
@@ -1196,7 +1208,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) {
+ if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
@@ -1220,6 +1232,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
+ stream_op.send_idempotent_request =
+ (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1371,6 +1385,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ stream_op.collect_stats = &call->stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
/* Flag validation: currently allow no flags */
@@ -1392,6 +1407,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ stream_op.collect_stats = &call->stats;
break;
}
}
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index dbfc8a9336..3ccc21fb31 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -48,8 +48,6 @@
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/client_config/lb_policies/pick_first.h"
-#include "src/core/lib/client_config/lb_policies/round_robin.h"
#include "src/core/lib/client_config/lb_policy_registry.h"
#include "src/core/lib/client_config/resolver_registry.h"
#include "src/core/lib/client_config/resolvers/dns_resolver.h"
@@ -75,6 +73,9 @@
#define GRPC_DEFAULT_NAME_PREFIX "dns:///"
#endif
+/* (generated) built in registry of plugins */
+extern void grpc_register_built_in_plugins(void);
+
#define MAX_PLUGINS 128
static gpr_once g_basic_init = GPR_ONCE_INIT;
@@ -83,6 +84,7 @@ static int g_initializations;
static void do_basic_init(void) {
gpr_mu_init(&g_init_mu);
+ grpc_register_built_in_plugins();
/* TODO(ctiller): ideally remove this strict linkage */
grpc_register_plugin(census_grpc_plugin_init, census_grpc_plugin_destroy);
g_initializations = 0;
@@ -165,14 +167,12 @@ void grpc_init(void) {
gpr_time_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
- grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create());
- grpc_register_lb_policy(grpc_pick_first_lb_factory_create());
- grpc_register_lb_policy(grpc_round_robin_lb_factory_create());
+ grpc_lb_policy_registry_init();
grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX);
grpc_register_resolver_type(grpc_dns_resolver_factory_create());
grpc_register_resolver_type(grpc_ipv4_resolver_factory_create());
grpc_register_resolver_type(grpc_ipv6_resolver_factory_create());
-#ifdef GPR_POSIX_SOCKET
+#ifdef GPR_HAVE_UNIX_SOCKET
grpc_register_resolver_type(grpc_unix_resolver_factory_create());
#endif
grpc_register_tracer("api", &grpc_api_trace);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 080734e9d5..55e5e49e11 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -100,6 +100,7 @@ typedef struct requested_call {
typedef struct channel_registered_method {
registered_method *server_registered_method;
+ uint32_t flags;
grpc_mdstr *method;
grpc_mdstr *host;
} channel_registered_method;
@@ -152,6 +153,7 @@ struct call_data {
grpc_completion_queue *cq_new;
grpc_metadata_batch *recv_initial_metadata;
+ bool recv_idempotent_request;
grpc_metadata_array initial_metadata;
grpc_closure got_initial_metadata;
@@ -171,6 +173,7 @@ struct request_matcher {
struct registered_method {
char *method;
char *host;
+ uint32_t flags;
request_matcher request_matcher;
registered_method *next;
};
@@ -468,6 +471,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if (!rm) break;
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
+ if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
+ !calld->recv_idempotent_request)
+ continue;
finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher);
return;
@@ -480,6 +486,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if (!rm) break;
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
+ if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
+ !calld->recv_idempotent_request)
+ continue;
finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher);
return;
@@ -598,9 +607,11 @@ static void server_mutate_op(grpc_call_element *elem,
call_data *calld = elem->call_data;
if (op->recv_initial_metadata != NULL) {
+ GPR_ASSERT(op->recv_idempotent_request == NULL);
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
+ op->recv_idempotent_request = &calld->recv_idempotent_request;
}
}
@@ -830,10 +841,12 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host) {
+ const char *host, uint32_t flags) {
registered_method *m;
- GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)",
- 3, (server, method, host));
+ GRPC_API_TRACE(
+ "grpc_server_register_method(server=%p, method=%s, host=%s, "
+ "flags=0x%08x)",
+ 4, (server, method, host, flags));
if (!method) {
gpr_log(GPR_ERROR,
"grpc_server_register_method method string cannot be NULL");
@@ -846,12 +859,18 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL;
}
}
+ if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
+ gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
+ flags);
+ return NULL;
+ }
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
request_matcher_init(&m->request_matcher, server->max_requested_calls);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
+ m->flags = flags;
server->registered_methods = m;
return m;
}
@@ -930,6 +949,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
if (probes > max_probes) max_probes = probes;
crm = &chand->registered_methods[(hash + probes) % slots];
crm->server_registered_method = rm;
+ crm->flags = rm->flags;
crm->host = host;
crm->method = method;
}
@@ -1247,6 +1267,10 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
rc->data.batch.details->deadline = calld->deadline;
+ rc->data.batch.details->flags =
+ 0 | (calld->recv_idempotent_request
+ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ : 0);
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;