aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c125
-rw-r--r--src/core/surface/call.h7
-rw-r--r--src/core/surface/call_log_batch.c8
-rw-r--r--src/core/surface/channel.c62
-rw-r--r--src/core/surface/channel.h5
-rw-r--r--src/core/surface/channel_connectivity.c185
-rw-r--r--src/core/surface/channel_create.c15
-rw-r--r--src/core/surface/completion_queue.c15
-rw-r--r--src/core/surface/init.c16
-rw-r--r--src/core/surface/lame_client.c20
-rw-r--r--src/core/surface/secure_channel_create.c15
-rw-r--r--src/core/surface/server.c461
-rw-r--r--src/core/surface/server.h6
-rw-r--r--src/core/surface/server_create.c5
14 files changed, 664 insertions, 281 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 71f4235571..327a096ffb 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -30,24 +30,24 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
-#include "src/core/census/grpc_context.h"
-#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
+#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <assert.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
/** The maximum number of completions possible.
Based upon the maximum number of individually queueable ops in the batch
@@ -235,8 +235,8 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /* Compression level for the call */
- grpc_compression_level compression_level;
+ /* Compression algorithm for the call */
+ grpc_compression_algorithm compression_algorithm;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -347,7 +347,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
- if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
+ 0) {
set_deadline_alarm(call, send_deadline);
}
return call;
@@ -469,9 +470,14 @@ static void set_status_code(grpc_call *call, status_source source,
}
}
-static void set_decode_compression_level(grpc_call *call,
- grpc_compression_level clevel) {
- call->compression_level = clevel;
+static void set_compression_algorithm(grpc_call *call,
+ grpc_compression_algorithm algo) {
+ call->compression_algorithm = algo;
+}
+
+grpc_compression_algorithm grpc_call_get_compression_algorithm(
+ const grpc_call *call) {
+ return call->compression_algorithm;
}
static void set_status_details(grpc_call *call, status_source source,
@@ -762,8 +768,18 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) {
if (call->error_status_set == 0) {
/* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
- call->incoming_message.slices, call->incoming_message.count);
+ grpc_byte_buffer *byte_buffer;
+ /* some aliases for readability */
+ gpr_slice *slices = call->incoming_message.slices;
+ const size_t nslices = call->incoming_message.count;
+
+ if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ byte_buffer = grpc_raw_compressed_byte_buffer_create(
+ slices, nslices, call->compression_algorithm);
+ } else {
+ byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
+ }
grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
@@ -782,6 +798,25 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_free(message);
return 0;
}
+ /* sanity check: if message flags indicate a compressed message, the
+ * compression level should already be present in the call, as parsed off its
+ * corresponding metadata. */
+ if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
+ char *message = NULL;
+ char *alg_name;
+ if (!grpc_compression_algorithm_name(call->compression_algorithm,
+ &alg_name)) {
+ /* This shouldn't happen, other than due to data corruption */
+ alg_name = "<unknown>";
+ }
+ gpr_asprintf(&message,
+ "Invalid compression algorithm (%s) for compressed message.",
+ alg_name);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
+ gpr_free(message);
+ return 0;
+ }
/* stash away parameters, and prepare for incoming slices */
if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
char *message = NULL;
@@ -897,7 +932,7 @@ static int prepare_application_metadata(grpc_call *call, size_t count,
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
(const gpr_uint8 *)md->value,
- md->value_length);
+ md->value_length, 1);
if (!grpc_mdstr_is_legal_header(l->md->key)) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata key");
return 0;
@@ -985,7 +1020,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
data.send_metadata.metadata);
mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
@@ -1168,7 +1203,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description) {
grpc_mdstr *details =
- description ? grpc_mdstr_from_string(c->metadata_context, description)
+ description ? grpc_mdstr_from_string(c->metadata_context, description, 0)
: NULL;
GPR_ASSERT(status != GRPC_STATUS_OK);
@@ -1218,6 +1253,11 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
elem->filter->start_transport_stream_op(elem, op);
}
+char *grpc_call_get_peer(grpc_call *call) {
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ return elem->filter->get_peer(elem);
+}
+
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
@@ -1243,8 +1283,9 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
- grpc_alarm_init(&call->alarm, deadline, call_alarm, call,
- gpr_now(GPR_CLOCK_REALTIME));
+ grpc_alarm_init(&call->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC));
}
/* we offset status by a small amount when storing it into transport metadata
@@ -1276,25 +1317,22 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
static void destroy_compression(void *ignored) {}
static gpr_uint32 decode_compression(grpc_mdelem *md) {
- grpc_compression_level clevel;
- void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ grpc_compression_algorithm algorithm;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
- clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ algorithm =
+ ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
- gpr_uint32 parsed_clevel_bytes;
- if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice),
- &parsed_clevel_bytes)) {
- /* the following cast is safe, as a gpr_uint32 should be able to hold all
- * possible values of the grpc_compression_level enum */
- clevel = (grpc_compression_level)parsed_clevel_bytes;
- } else {
- clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ const char *md_c_str = grpc_mdstr_as_c_string(md->value);
+ if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
+ gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
+ assert(0);
}
- grpc_mdelem_set_user_data(md, destroy_compression,
- (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
+ grpc_mdelem_set_user_data(
+ md, destroy_compression,
+ (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
}
- return clevel;
+ return algorithm;
}
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
@@ -1313,8 +1351,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value));
} else if (key ==
- grpc_channel_get_compresssion_level_string(call->channel)) {
- set_decode_compression_level(call, decode_compression(md));
+ grpc_channel_get_compression_algorithm_string(call->channel)) {
+ set_compression_algorithm(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1338,7 +1376,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
l->md = 0;
}
}
- if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+ 0) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
@@ -1429,7 +1468,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
- req->flags = ops->flags;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
@@ -1461,7 +1500,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.send_status_from_server.status_details != NULL
? grpc_mdstr_from_string(
call->metadata_context,
- op->data.send_status_from_server.status_details)
+ op->data.send_status_from_server.status_details, 0)
: NULL;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 3b6f9c942e..265638d519 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -134,6 +134,10 @@ void grpc_server_log_request_call(char *file, int line,
grpc_completion_queue *cq_for_notification,
void *tag);
+void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity,
+ grpc_server *server, grpc_completion_queue *cq,
+ void *tag);
+
/* Set a context pointer.
No thread safety guarantees are made wrt this value. */
void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
@@ -151,6 +155,9 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
grpc_server_log_request_call(sev, server, call, details, initial_metadata, \
cq_bound_to_call, cq_for_notifications, tag)
+#define GRPC_SERVER_LOG_SHUTDOWN(sev, server, cq, tag) \
+ if (grpc_trace_batch) grpc_server_log_shutdown(sev, server, cq, tag)
+
gpr_uint8 grpc_call_is_client(grpc_call *call);
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c
index 997046d954..7bf8cafc24 100644
--- a/src/core/surface/call_log_batch.c
+++ b/src/core/surface/call_log_batch.c
@@ -136,3 +136,11 @@ void grpc_server_log_request_call(char *file, int line,
"tag=%p)", server, call, details, initial_metadata,
cq_bound_to_call, cq_for_notification, tag);
}
+
+void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity,
+ grpc_server *server, grpc_completion_queue *cq,
+ void *tag) {
+ gpr_log(file, line, severity,
+ "grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", server,
+ cq, tag);
+}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index b7826d4dfc..81f673f856 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -36,12 +36,14 @@
#include <stdlib.h>
#include <string.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include "src/core/surface/call.h"
#include "src/core/surface/init.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
* Avoids needing to take a metadata context lock for sending status
@@ -63,7 +65,7 @@ struct grpc_channel {
grpc_mdctx *metadata_context;
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
- grpc_mdstr *grpc_compression_level_string;
+ grpc_mdstr *grpc_compression_algorithm_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@@ -73,6 +75,7 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
grpc_iomgr_closure destroy_closure;
+ char *target;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -85,31 +88,32 @@ struct grpc_channel {
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
grpc_channel *grpc_channel_create_from_filters(
- const grpc_channel_filter **filters, size_t num_filters,
+ const char *target, const grpc_channel_filter **filters, size_t num_filters,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
memset(channel, 0, sizeof(*channel));
+ channel->target = gpr_strdup(target);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
/* decremented by grpc_channel_destroy */
gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx;
- channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
- channel->grpc_compression_level_string =
- grpc_mdstr_from_string(mdctx, "grpc-compression-level");
- channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
+ channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
+ channel->grpc_compression_algorithm_string =
+ grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
+ channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message", 0);
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(i, buf);
channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings(
mdctx, GRPC_MDSTR_REF(channel->grpc_status_string),
- grpc_mdstr_from_string(mdctx, buf));
+ grpc_mdstr_from_string(mdctx, buf, 0));
}
- channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
- channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
+ channel->path_string = grpc_mdstr_from_string(mdctx, ":path", 0);
+ channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority", 0);
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
@@ -137,18 +141,25 @@ grpc_channel *grpc_channel_create_from_filters(
return channel;
}
+char *grpc_channel_get_target(grpc_channel *channel) {
+ return gpr_strdup(channel->target);
+}
+
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
grpc_mdelem *send_metadata[2];
+ int num_metadata = 0;
GPR_ASSERT(channel->is_client);
- send_metadata[0] = path_mdelem;
- send_metadata[1] = authority_mdelem;
+ send_metadata[num_metadata++] = path_mdelem;
+ if (authority_mdelem != NULL) {
+ send_metadata[num_metadata++] = authority_mdelem;
+ }
return grpc_call_create(channel, cq, NULL, send_metadata,
- GPR_ARRAY_SIZE(send_metadata), deadline);
+ num_metadata, deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
@@ -159,10 +170,11 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
channel, cq,
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
- grpc_mdstr_from_string(channel->metadata_context, method)),
+ grpc_mdstr_from_string(channel->metadata_context, method, 0)),
+ host ?
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host)),
+ grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL,
deadline);
}
@@ -171,10 +183,10 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
registered_call *rc = gpr_malloc(sizeof(registered_call));
rc->path = grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
- grpc_mdstr_from_string(channel->metadata_context, method));
- rc->authority = grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(channel->metadata_context, method, 0));
+ rc->authority = host ? grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host));
+ grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL;
gpr_mu_lock(&channel->registered_call_mu);
rc->next = channel->registered_calls;
channel->registered_calls = rc;
@@ -188,7 +200,7 @@ grpc_call *grpc_channel_create_registered_call(
registered_call *rc = registered_call_handle;
return grpc_channel_create_call_internal(
channel, completion_queue, GRPC_MDELEM_REF(rc->path),
- GRPC_MDELEM_REF(rc->authority), deadline);
+ rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
@@ -209,7 +221,7 @@ static void destroy_channel(void *p, int ok) {
GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]);
}
GRPC_MDSTR_UNREF(channel->grpc_status_string);
- GRPC_MDSTR_UNREF(channel->grpc_compression_level_string);
+ GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string);
GRPC_MDSTR_UNREF(channel->grpc_message_string);
GRPC_MDSTR_UNREF(channel->path_string);
GRPC_MDSTR_UNREF(channel->authority_string);
@@ -222,6 +234,7 @@ static void destroy_channel(void *p, int ok) {
}
grpc_mdctx_unref(channel->metadata_context);
gpr_mu_destroy(&channel->registered_call_mu);
+ gpr_free(channel->target);
gpr_free(channel);
}
@@ -262,8 +275,9 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
- return channel->grpc_compression_level_string;
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
+ grpc_channel *channel) {
+ return channel->grpc_compression_algorithm_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
@@ -274,7 +288,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
gpr_ltoa(i, tmp);
return grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->grpc_status_string),
- grpc_mdstr_from_string(channel->metadata_context, tmp));
+ grpc_mdstr_from_string(channel->metadata_context, tmp, 0));
}
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 71f8a55731..9e0646efaa 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -38,7 +38,7 @@
#include "src/core/client_config/subchannel_factory.h"
grpc_channel *grpc_channel_create_from_filters(
- const grpc_channel_filter **filters, size_t count,
+ const char *target, const grpc_channel_filter **filters, size_t count,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
/** Get a (borrowed) pointer to this channels underlying channel stack */
@@ -54,7 +54,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
+ grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
new file mode 100644
index 0000000000..1223706457
--- /dev/null
+++ b/src/core/surface/channel_connectivity.c
@@ -0,0 +1,185 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/channel.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/channel/client_channel.h"
+#include "src/core/iomgr/alarm.h"
+#include "src/core/surface/completion_queue.h"
+
+grpc_connectivity_state grpc_channel_check_connectivity_state(
+ grpc_channel *channel, int try_to_connect) {
+ /* forward through to the underlying client channel */
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ if (client_channel_elem->filter != &grpc_client_channel_filter) {
+ gpr_log(GPR_ERROR,
+ "grpc_channel_check_connectivity_state called on something that is "
+ "not a client channel, but '%s'",
+ client_channel_elem->filter->name);
+ return GRPC_CHANNEL_FATAL_FAILURE;
+ }
+ return grpc_client_channel_check_connectivity_state(client_channel_elem,
+ try_to_connect);
+}
+
+typedef enum {
+ WAITING,
+ CALLING_BACK,
+ CALLING_BACK_AND_FINISHED,
+ CALLED_BACK
+} callback_phase;
+
+typedef struct {
+ gpr_mu mu;
+ callback_phase phase;
+ int success;
+ grpc_iomgr_closure on_complete;
+ grpc_alarm alarm;
+ grpc_connectivity_state state;
+ grpc_completion_queue *cq;
+ grpc_cq_completion completion_storage;
+ grpc_channel *channel;
+ void *tag;
+} state_watcher;
+
+static void delete_state_watcher(state_watcher *w) {
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq));
+ GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
+ gpr_mu_destroy(&w->mu);
+ gpr_free(w);
+}
+
+static void finished_completion(void *pw, grpc_cq_completion *ignored) {
+ int delete = 0;
+ state_watcher *w = pw;
+ gpr_mu_lock(&w->mu);
+ switch (w->phase) {
+ case WAITING:
+ case CALLED_BACK:
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ break;
+ case CALLING_BACK:
+ w->phase = CALLED_BACK;
+ break;
+ case CALLING_BACK_AND_FINISHED:
+ delete = 1;
+ break;
+ }
+ gpr_mu_unlock(&w->mu);
+
+ if (delete) {
+ delete_state_watcher(w);
+ }
+}
+
+static void partly_done(state_watcher *w, int due_to_completion) {
+ int delete = 0;
+
+ if (due_to_completion) {
+ gpr_mu_lock(&w->mu);
+ w->success = 1;
+ gpr_mu_unlock(&w->mu);
+ grpc_alarm_cancel(&w->alarm);
+ }
+
+ gpr_mu_lock(&w->mu);
+ switch (w->phase) {
+ case WAITING:
+ w->phase = CALLING_BACK;
+ grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
+ &w->completion_storage);
+ break;
+ case CALLING_BACK:
+ w->phase = CALLING_BACK_AND_FINISHED;
+ break;
+ case CALLING_BACK_AND_FINISHED:
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ break;
+ case CALLED_BACK:
+ delete = 1;
+ break;
+ }
+ gpr_mu_unlock(&w->mu);
+
+ if (delete) {
+ delete_state_watcher(w);
+ }
+}
+
+static void watch_complete(void *pw, int success) { partly_done(pw, 1); }
+
+static void timeout_complete(void *pw, int success) { partly_done(pw, 0); }
+
+void grpc_channel_watch_connectivity_state(
+ grpc_channel *channel, grpc_connectivity_state last_observed_state,
+ gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ state_watcher *w = gpr_malloc(sizeof(*w));
+
+ grpc_cq_begin_op(cq);
+
+ gpr_mu_init(&w->mu);
+ grpc_iomgr_closure_init(&w->on_complete, watch_complete, w);
+ w->phase = WAITING;
+ w->state = last_observed_state;
+ w->success = 0;
+ w->cq = cq;
+ w->tag = tag;
+ w->channel = channel;
+
+ grpc_alarm_init(
+ &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
+
+ if (client_channel_elem->filter != &grpc_client_channel_filter) {
+ gpr_log(GPR_ERROR,
+ "grpc_channel_watch_connectivity_state called on something that is "
+ "not a client channel, but '%s'",
+ client_channel_elem->filter->name);
+ grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
+ } else {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
+ grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq));
+ grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
+ &w->on_complete);
+ }
+}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index e205f0a9f8..707d615688 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -40,6 +40,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
+#include "src/core/channel/compress_filter.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
@@ -108,6 +109,7 @@ typedef struct {
gpr_refcount refs;
grpc_mdctx *mdctx;
grpc_channel_args *merge_args;
+ grpc_channel *master;
} subchannel_factory;
static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
@@ -118,6 +120,7 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
if (gpr_unref(&f->refs)) {
+ GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
grpc_channel_args_destroy(f->merge_args);
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
@@ -136,6 +139,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;
+ args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(&c->base);
grpc_channel_args_destroy(final_args);
@@ -150,8 +154,8 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
Asynchronously: - resolve target
- connect to it (trying alternatives as presented)
- perform handshakes */
-grpc_channel *grpc_channel_create(const char *target,
- const grpc_channel_args *args) {
+grpc_channel *grpc_insecure_channel_create(const char *target,
+ const grpc_channel_args *args) {
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
@@ -163,21 +167,26 @@ grpc_channel *grpc_channel_create(const char *target,
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
+ filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
+ channel =
+ grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1);
+
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
gpr_ref_init(&f->refs, 1);
grpc_mdctx_ref(mdctx);
f->mdctx = mdctx;
f->merge_args = grpc_channel_args_copy(args);
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
return NULL;
}
- channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
resolver);
GRPC_RESOLVER_UNREF(resolver, "create");
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c67f75fc5c..3f60b0b0ba 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -116,7 +116,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) {
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- int shutdown = gpr_unref(&cc->pending_events);
+ int shutdown;
storage->tag = tag;
storage->done = done;
@@ -124,15 +124,15 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
storage->next =
((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ shutdown = gpr_unref(&cc->pending_events);
if (!shutdown) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
grpc_pollset_kick(&cc->pollset);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
@@ -148,6 +148,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
grpc_event ret;
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
@@ -188,6 +190,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
@@ -260,8 +264,9 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_kick(&cc->pollset);
- grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(100)));
+ grpc_pollset_work(&cc->pollset,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(100, GPR_TIMESPAN)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 04e27d30ac..442bc72f21 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -39,6 +39,7 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/client_config/resolvers/dns_resolver.h"
+#include "src/core/client_config/resolvers/sockaddr_resolver.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h"
@@ -46,10 +47,7 @@
#include "src/core/surface/init.h"
#include "src/core/surface/surface_trace.h"
#include "src/core/transport/chttp2_transport.h"
-
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/client_config/resolvers/unix_resolver_posix.h"
-#endif
+#include "src/core/transport/connectivity_state.h"
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
@@ -68,6 +66,8 @@ void grpc_init(void) {
gpr_time_init();
grpc_resolver_registry_init("dns:///");
grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create());
+ grpc_register_resolver_type("ipv4", grpc_ipv4_resolver_factory_create());
+ grpc_register_resolver_type("ipv6", grpc_ipv6_resolver_factory_create());
#ifdef GPR_POSIX_SOCKET
grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create());
#endif
@@ -76,11 +76,15 @@ void grpc_init(void) {
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
grpc_register_tracer("batch", &grpc_trace_batch);
+ grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
grpc_security_pre_init();
grpc_iomgr_init();
grpc_tracer_init("GRPC_TRACE");
- if (census_initialize(CENSUS_NONE)) {
- gpr_log(GPR_ERROR, "Could not initialize census.");
+ /* Only initialize census if noone else has. */
+ if (census_enabled() == CENSUS_FEATURE_NONE) {
+ if (census_initialize(census_supported())) { /* enable all features. */
+ gpr_log(GPR_ERROR, "Could not initialize census.");
+ }
}
grpc_timers_global_init();
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 3dd56fe5a9..c4215a2cfb 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -47,7 +47,10 @@ typedef struct {
grpc_linked_mdelem details;
} call_data;
-typedef struct { grpc_mdctx *mdctx; } channel_data;
+typedef struct {
+ grpc_mdctx *mdctx;
+ grpc_channel *master;
+} channel_data;
static void lame_start_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
@@ -72,7 +75,7 @@ static void lame_start_transport_stream_op(grpc_call_element *elem,
mdb.list.head = &calld->status;
mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
@@ -82,6 +85,11 @@ static void lame_start_transport_stream_op(grpc_call_element *elem,
}
}
+static char *lame_get_peer(grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ return grpc_channel_get_target(chand->master);
+}
+
static void lame_start_transport_op(grpc_channel_element *elem,
grpc_transport_op *op) {
if (op->on_connectivity_state_change) {
@@ -112,6 +120,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
chand->mdctx = mdctx;
+ chand->master = master;
}
static void destroy_channel_elem(grpc_channel_element *elem) {}
@@ -125,11 +134,12 @@ static const grpc_channel_filter lame_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ lame_get_peer,
"lame-client",
};
-grpc_channel *grpc_lame_client_channel_create(void) {
+grpc_channel *grpc_lame_client_channel_create(const char *target) {
static const grpc_channel_filter *filters[] = {&lame_filter};
- return grpc_channel_create_from_filters(filters, 1, NULL, grpc_mdctx_create(),
- 1);
+ return grpc_channel_create_from_filters(target, filters, 1, NULL,
+ grpc_mdctx_create(), 1);
}
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index f3c7d8397b..1f89353025 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -40,6 +40,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
+#include "src/core/channel/compress_filter.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
@@ -133,6 +134,7 @@ typedef struct {
grpc_mdctx *mdctx;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
+ grpc_channel *master;
} subchannel_factory;
static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
@@ -145,6 +147,7 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"subchannel_factory");
+ GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
grpc_channel_args_destroy(f->merge_args);
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
@@ -164,6 +167,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;
+ args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(&c->base);
grpc_channel_args_destroy(final_args);
@@ -195,13 +199,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
if (grpc_find_security_connector_in_args(args) != NULL) {
gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
- return grpc_lame_client_channel_create();
+ return grpc_lame_client_channel_create(target);
}
if (grpc_credentials_create_security_connector(
creds, target, args, NULL, &connector, &new_args_from_connector) !=
GRPC_SECURITY_OK) {
- return grpc_lame_client_channel_create();
+ return grpc_lame_client_channel_create(target);
}
mdctx = grpc_mdctx_create();
@@ -213,9 +217,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
+ filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
+ channel =
+ grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1);
+
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
gpr_ref_init(&f->refs, 1);
@@ -224,12 +232,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory");
f->security_connector = connector;
f->merge_args = grpc_channel_args_copy(args_copy);
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
return NULL;
}
- channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
resolver);
GRPC_RESOLVER_UNREF(resolver, "create");
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 4dc51bf031..c370a9b8ab 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -36,22 +36,22 @@
#include <stdlib.h>
#include <string.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
#include "src/core/channel/census_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/iomgr.h"
+#include "src/core/support/stack_lockfree.h"
#include "src/core/support/string.h"
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/init.h"
#include "src/core/transport/metadata.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
-
-typedef enum { PENDING_START, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
@@ -74,8 +74,8 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call {
requested_call_type type;
- struct requested_call *next;
void *tag;
+ grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
@@ -94,14 +94,6 @@ typedef struct requested_call {
} data;
} requested_call;
-struct registered_method {
- char *method;
- char *host;
- call_data *pending;
- requested_call *requests;
- registered_method *next;
-};
-
typedef struct channel_registered_method {
registered_method *server_registered_method;
grpc_mdstr *method;
@@ -130,44 +122,6 @@ typedef struct shutdown_tag {
grpc_cq_completion completion;
} shutdown_tag;
-struct grpc_server {
- size_t channel_filter_count;
- const grpc_channel_filter **channel_filters;
- grpc_channel_args *channel_args;
-
- grpc_completion_queue **cqs;
- grpc_pollset **pollsets;
- size_t cq_count;
-
- /* The two following mutexes control access to server-state
- mu_global controls access to non-call-related state (e.g., channel state)
- mu_call controls access to call-related state (e.g., the call lists)
-
- If they are ever required to be nested, you must lock mu_global
- before mu_call. This is currently used in shutdown processing
- (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
- gpr_mu mu_global; /* mutex for server and channel state */
- gpr_mu mu_call; /* mutex for call-specific state */
-
- registered_method *registered_methods;
- requested_call *requests;
-
- gpr_uint8 shutdown;
- gpr_uint8 shutdown_published;
- size_t num_shutdown_tags;
- shutdown_tag *shutdown_tags;
-
- call_data *lists[CALL_LIST_COUNT];
- channel_data root_channel_data;
-
- listener *listeners;
- int listeners_destroyed;
- gpr_refcount internal_refcount;
-
- /** when did we print the last shutdown progress message */
- gpr_timespec last_shutdown_message_time;
-};
-
typedef enum {
/* waiting for metadata */
NOT_STARTED,
@@ -179,6 +133,8 @@ typedef enum {
ZOMBIED
} call_state;
+typedef struct request_matcher request_matcher;
+
struct call_data {
grpc_call *call;
@@ -201,8 +157,20 @@ struct call_data {
grpc_iomgr_closure server_on_recv;
grpc_iomgr_closure kill_zombie_closure;
- call_data **root[CALL_LIST_COUNT];
- call_link links[CALL_LIST_COUNT];
+ call_data *pending_next;
+};
+
+struct request_matcher {
+ call_data *pending_head;
+ call_data *pending_tail;
+ gpr_stack_lockfree *requests;
+};
+
+struct registered_method {
+ char *method;
+ char *host;
+ request_matcher request_matcher;
+ registered_method *next;
};
typedef struct {
@@ -210,6 +178,48 @@ typedef struct {
size_t num_channels;
} channel_broadcaster;
+struct grpc_server {
+ size_t channel_filter_count;
+ const grpc_channel_filter **channel_filters;
+ grpc_channel_args *channel_args;
+
+ grpc_completion_queue **cqs;
+ grpc_pollset **pollsets;
+ size_t cq_count;
+
+ /* The two following mutexes control access to server-state
+ mu_global controls access to non-call-related state (e.g., channel state)
+ mu_call controls access to call-related state (e.g., the call lists)
+
+ If they are ever required to be nested, you must lock mu_global
+ before mu_call. This is currently used in shutdown processing
+ (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
+ gpr_mu mu_global; /* mutex for server and channel state */
+ gpr_mu mu_call; /* mutex for call-specific state */
+
+ registered_method *registered_methods;
+ request_matcher unregistered_request_matcher;
+ /** free list of available requested_calls indices */
+ gpr_stack_lockfree *request_freelist;
+ /** requested call backing data */
+ requested_call *requested_calls;
+ int max_requested_calls;
+
+ gpr_atm shutdown_flag;
+ gpr_uint8 shutdown_published;
+ size_t num_shutdown_tags;
+ shutdown_tag *shutdown_tags;
+
+ channel_data root_channel_data;
+
+ listener *listeners;
+ int listeners_destroyed;
+ gpr_refcount internal_refcount;
+
+ /** when did we print the last shutdown progress message */
+ gpr_timespec last_shutdown_message_time;
+};
+
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
@@ -220,7 +230,9 @@ static void fail_call(grpc_server *server, requested_call *rc);
hold mu_call */
static void maybe_finish_shutdown(grpc_server *server);
-/* channel broadcaster */
+/*
+ * channel broadcaster
+ */
/* assumes server locked */
static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
@@ -281,55 +293,44 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb,
gpr_free(cb->channels);
}
-/* call list */
+/*
+ * request_matcher
+ */
-static int call_list_join(call_data **root, call_data *call, call_list list) {
- GPR_ASSERT(!call->root[list]);
- call->root[list] = root;
- if (!*root) {
- *root = call;
- call->links[list].next = call->links[list].prev = call;
- } else {
- call->links[list].next = *root;
- call->links[list].prev = (*root)->links[list].prev;
- call->links[list].next->links[list].prev =
- call->links[list].prev->links[list].next = call;
- }
- return 1;
+static void request_matcher_init(request_matcher *request_matcher,
+ int entries) {
+ memset(request_matcher, 0, sizeof(*request_matcher));
+ request_matcher->requests = gpr_stack_lockfree_create(entries);
}
-static call_data *call_list_remove_head(call_data **root, call_list list) {
- call_data *out = *root;
- if (out) {
- out->root[list] = NULL;
- if (out->links[list].next == out) {
- *root = NULL;
- } else {
- *root = out->links[list].next;
- out->links[list].next->links[list].prev = out->links[list].prev;
- out->links[list].prev->links[list].next = out->links[list].next;
- }
- }
- return out;
+static void request_matcher_destroy(request_matcher *request_matcher) {
+ GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1);
+ gpr_stack_lockfree_destroy(request_matcher->requests);
}
-static int call_list_remove(call_data *call, call_list list) {
- call_data **root = call->root[list];
- if (root == NULL) return 0;
- call->root[list] = NULL;
- if (*root == call) {
- *root = call->links[list].next;
- if (*root == call) {
- *root = NULL;
- return 1;
- }
+static void kill_zombie(void *elem, int success) {
+ grpc_call_destroy(grpc_call_from_top_element(elem));
+}
+
+static void request_matcher_zombify_all_pending_calls(
+ request_matcher *request_matcher) {
+ while (request_matcher->pending_head) {
+ call_data *calld = request_matcher->pending_head;
+ request_matcher->pending_head = calld->pending_next;
+ gpr_mu_lock(&calld->mu_state);
+ calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_iomgr_closure_init(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
- GPR_ASSERT(*root != call);
- call->links[list].next->links[list].prev = call->links[list].prev;
- call->links[list].prev->links[list].next = call->links[list].next;
- return 1;
}
+/*
+ * server proper
+ */
+
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@@ -343,6 +344,7 @@ static void server_delete(grpc_server *server) {
gpr_free(server->channel_filters);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
+ request_matcher_destroy(&rm->request_matcher);
gpr_free(rm->method);
gpr_free(rm->host);
gpr_free(rm);
@@ -350,9 +352,12 @@ static void server_delete(grpc_server *server) {
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
}
+ request_matcher_destroy(&server->unregistered_request_matcher);
+ gpr_stack_lockfree_destroy(server->request_freelist);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
+ gpr_free(server->requested_calls);
gpr_free(server);
}
@@ -391,25 +396,38 @@ static void destroy_channel(channel_data *chand) {
}
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
- call_data **pending_root,
- requested_call **requests) {
- requested_call *rc;
+ request_matcher *request_matcher) {
call_data *calld = elem->call_data;
- gpr_mu_lock(&server->mu_call);
- rc = *requests;
- if (rc == NULL) {
+ int request_id;
+
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_mu_lock(&calld->mu_state);
+ calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ return;
+ }
+
+ request_id = gpr_stack_lockfree_pop(request_matcher->requests);
+ if (request_id == -1) {
+ gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
gpr_mu_unlock(&calld->mu_state);
- call_list_join(pending_root, calld, PENDING_START);
+ if (request_matcher->pending_head == NULL) {
+ request_matcher->pending_tail = request_matcher->pending_head = calld;
+ } else {
+ request_matcher->pending_tail->pending_next = calld;
+ request_matcher->pending_tail = calld;
+ }
+ calld->pending_next = NULL;
gpr_mu_unlock(&server->mu_call);
} else {
- *requests = rc->next;
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- gpr_mu_unlock(&server->mu_call);
- begin_call(server, calld, rc);
+ begin_call(server, calld, &server->requested_calls[request_id]);
}
}
@@ -431,8 +449,8 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break;
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
- &rm->server_registered_method->requests);
+ finish_start_new_rpc(server, elem,
+ &rm->server_registered_method->request_matcher);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -443,17 +461,12 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break;
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
- &rm->server_registered_method->requests);
+ finish_start_new_rpc(server, elem,
+ &rm->server_registered_method->request_matcher);
return;
}
}
- finish_start_new_rpc(server, elem, &server->lists[PENDING_START],
- &server->requests);
-}
-
-static void kill_zombie(void *elem, int success) {
- grpc_call_destroy(grpc_call_from_top_element(elem));
+ finish_start_new_rpc(server, elem, &server->unregistered_request_matcher);
}
static int num_listeners(grpc_server *server) {
@@ -481,15 +494,15 @@ static int num_channels(grpc_server *server) {
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
- if (!server->shutdown || server->shutdown_published) {
+ if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
- if (gpr_time_cmp(
- gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time),
- gpr_time_from_seconds(1)) >= 0) {
+ if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
+ server->last_shutdown_message_time),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_log(GPR_DEBUG,
"Waiting for %d channels and %d/%d listeners to be destroyed"
@@ -526,7 +539,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
+ gpr_timespec op_deadline;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -536,11 +549,15 @@ static void server_on_recv(void *ptr, int success) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
+ op_deadline = op->data.metadata.deadline;
+ if (0 !=
+ gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
calld->deadline = op->data.metadata.deadline;
}
- calld->got_initial_metadata = 1;
- start_new_rpc(elem);
+ if (calld->host && calld->path) {
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
+ }
break;
}
}
@@ -571,11 +588,8 @@ static void server_on_recv(void *ptr, int success) {
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- gpr_mu_lock(&chand->server->mu_call);
- call_list_remove(calld, PENDING_START);
- gpr_mu_unlock(&chand->server->mu_call);
- grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ /* zombied call will be destroyed when it's removed from the pending
+ queue... later */
} else {
gpr_mu_unlock(&calld->mu_state);
}
@@ -610,7 +624,7 @@ static void accept_stream(void *cd, grpc_transport *transport,
channel_data *chand = cd;
/* create a call */
grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
- gpr_inf_future);
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
@@ -638,7 +652,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
- calld->deadline = gpr_inf_future;
+ calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
@@ -653,11 +667,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- if (calld->state == PENDING) {
- gpr_mu_lock(&chand->server->mu_call);
- call_list_remove(elem->call_data, PENDING_START);
- gpr_mu_unlock(&chand->server->mu_call);
- }
+ GPR_ASSERT(calld->state != PENDING);
if (calld->host) {
GRPC_MDSTR_UNREF(calld->host);
@@ -680,8 +690,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GPR_ASSERT(!is_last);
chand->server = NULL;
chand->channel = NULL;
- chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
- chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
+ chand->path_key = grpc_mdstr_from_string(metadata_context, ":path", 0);
+ chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority", 0);
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
@@ -725,6 +735,7 @@ static const grpc_channel_filter server_surface_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
"server",
};
@@ -742,9 +753,9 @@ void grpc_server_register_completion_queue(grpc_server *server,
server->cqs[n] = cq;
}
-grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
- size_t filter_count,
- const grpc_channel_args *args) {
+grpc_server *grpc_server_create_from_filters(
+ const grpc_channel_filter **filters, size_t filter_count,
+ const grpc_channel_args *args) {
size_t i;
/* TODO(census): restore this once we finalize census filter etc.
int census_enabled = grpc_channel_args_is_census_enabled(args); */
@@ -764,6 +775,18 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
+ /* TODO(ctiller): expose a channel_arg for this */
+ server->max_requested_calls = 32768;
+ server->request_freelist =
+ gpr_stack_lockfree_create(server->max_requested_calls);
+ for (i = 0; i < (size_t)server->max_requested_calls; i++) {
+ gpr_stack_lockfree_push(server->request_freelist, i);
+ }
+ request_matcher_init(&server->unregistered_request_matcher,
+ server->max_requested_calls);
+ server->requested_calls = gpr_malloc(server->max_requested_calls *
+ sizeof(*server->requested_calls));
+
/* Server filter stack is:
server_surface_filter - for making surface API calls
@@ -811,6 +834,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
+ request_matcher_init(&m->request_matcher, server->max_requested_calls);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
@@ -868,8 +892,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, &op);
}
- channel =
- grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0);
+ channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
+ mdctx, 0);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
@@ -889,8 +913,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
chand->registered_methods = gpr_malloc(alloc);
memset(chand->registered_methods, 0, alloc);
for (rm = s->registered_methods; rm; rm = rm->next) {
- host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
- method = grpc_mdstr_from_string(mdctx, rm->method);
+ host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host, 0) : NULL;
+ method = grpc_mdstr_from_string(mdctx, rm->method, 0);
hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
for (probes = 0; chand->registered_methods[(hash + probes) % slots]
.server_registered_method != NULL;
@@ -926,13 +950,51 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, &op);
}
+typedef struct {
+ requested_call **requests;
+ size_t count;
+ size_t capacity;
+} request_killer;
+
+static void request_killer_init(request_killer *rk) {
+ memset(rk, 0, sizeof(*rk));
+}
+
+static void request_killer_add(request_killer *rk, requested_call *rc) {
+ if (rk->capacity == rk->count) {
+ rk->capacity = GPR_MAX(8, rk->capacity * 2);
+ rk->requests =
+ gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
+ }
+ rk->requests[rk->count++] = rc;
+}
+
+static void request_killer_add_request_matcher(request_killer *rk,
+ grpc_server *server,
+ request_matcher *rm) {
+ int request_id;
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
+ request_killer_add(rk, &server->requested_calls[request_id]);
+ }
+}
+
+static void request_killer_run(request_killer *rk, grpc_server *server) {
+ size_t i;
+ for (i = 0; i < rk->count; i++) {
+ fail_call(server, rk->requests[i]);
+ }
+ gpr_free(rk->requests);
+}
+
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
+ request_killer reqkill;
+
+ GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
@@ -943,7 +1005,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt = &server->shutdown_tags[server->num_shutdown_tags++];
sdt->tag = tag;
sdt->cq = cq;
- if (server->shutdown) {
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_unlock(&server->mu_global);
return;
}
@@ -951,31 +1013,26 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
channel_broadcaster_init(server, &broadcaster);
+ request_killer_init(&reqkill);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- requests = server->requests;
- server->requests = NULL;
+ request_killer_add_request_matcher(&reqkill, server,
+ &server->unregistered_request_matcher);
+ request_matcher_zombify_all_pending_calls(
+ &server->unregistered_request_matcher);
for (rm = server->registered_methods; rm; rm = rm->next) {
- while (rm->requests != NULL) {
- requested_call *c = rm->requests;
- rm->requests = c->next;
- c->next = requests;
- requests = c;
- }
+ request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher);
}
gpr_mu_unlock(&server->mu_call);
- server->shutdown = 1;
+ gpr_atm_rel_store(&server->shutdown_flag, 1);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
- while (requests != NULL) {
- requested_call *next = requests->next;
- fail_call(server, requests);
- requests = next;
- }
+ request_killer_run(&reqkill, server);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -1007,7 +1064,7 @@ void grpc_server_destroy(grpc_server *server) {
listener *l;
gpr_mu_lock(&server->mu_global);
- GPR_ASSERT(server->shutdown || !server->listeners);
+ GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
while (server->listeners) {
@@ -1037,39 +1094,55 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
- requested_call **requests = NULL;
- gpr_mu_lock(&server->mu_call);
- if (server->shutdown) {
- gpr_mu_unlock(&server->mu_call);
+ request_matcher *request_matcher = NULL;
+ int request_id;
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ fail_call(server, rc);
+ return GRPC_CALL_OK;
+ }
+ request_id = gpr_stack_lockfree_pop(server->request_freelist);
+ if (request_id == -1) {
+ /* out of request ids: just fail this one */
fail_call(server, rc);
return GRPC_CALL_OK;
}
switch (rc->type) {
case BATCH_CALL:
- calld =
- call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
- requests = &server->requests;
+ request_matcher = &server->unregistered_request_matcher;
break;
case REGISTERED_CALL:
- calld = call_list_remove_head(
- &rc->data.registered.registered_method->pending, PENDING_START);
- requests = &rc->data.registered.registered_method->requests;
+ request_matcher = &rc->data.registered.registered_method->request_matcher;
break;
}
- if (calld != NULL) {
- gpr_mu_unlock(&server->mu_call);
- gpr_mu_lock(&calld->mu_state);
- GPR_ASSERT(calld->state == PENDING);
- calld->state = ACTIVATED;
- gpr_mu_unlock(&calld->mu_state);
- begin_call(server, calld, rc);
- return GRPC_CALL_OK;
- } else {
- rc->next = *requests;
- *requests = rc;
+ server->requested_calls[request_id] = *rc;
+ gpr_free(rc);
+ if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) {
+ /* this was the first queued request: we need to lock and start
+ matching calls */
+ gpr_mu_lock(&server->mu_call);
+ while ((calld = request_matcher->pending_head) != NULL) {
+ request_id = gpr_stack_lockfree_pop(request_matcher->requests);
+ if (request_id == -1) break;
+ request_matcher->pending_head = calld->pending_next;
+ gpr_mu_unlock(&server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == ZOMBIED) {
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_iomgr_closure_init(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ } else {
+ GPR_ASSERT(calld->state == PENDING);
+ calld->state = ACTIVATED;
+ gpr_mu_unlock(&calld->mu_state);
+ begin_call(server, calld, &server->requested_calls[request_id]);
+ }
+ gpr_mu_lock(&server->mu_call);
+ }
gpr_mu_unlock(&server->mu_call);
- return GRPC_CALL_OK;
}
+ return GRPC_CALL_OK;
}
grpc_call_error grpc_server_request_call(
@@ -1087,6 +1160,7 @@ grpc_call_error grpc_server_request_call(
}
grpc_cq_begin_op(cq_for_notification);
rc->type = BATCH_CALL;
+ rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->cq_for_notification = cq_for_notification;
@@ -1109,6 +1183,7 @@ grpc_call_error grpc_server_request_registered_call(
}
grpc_cq_begin_op(cq_for_notification);
rc->type = REGISTERED_CALL;
+ rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->cq_for_notification = cq_for_notification;
@@ -1188,7 +1263,18 @@ static void begin_call(grpc_server *server, call_data *calld,
}
static void done_request_event(void *req, grpc_cq_completion *c) {
- gpr_free(req);
+ requested_call *rc = req;
+ grpc_server *server = rc->server;
+
+ if (rc >= server->requested_calls &&
+ rc < server->requested_calls + server->max_requested_calls) {
+ gpr_stack_lockfree_push(server->request_freelist,
+ rc - server->requested_calls);
+ } else {
+ gpr_free(req);
+ }
+
+ server_unref(server);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1201,6 +1287,7 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
+ server_ref(server);
grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
&rc->completion);
}
@@ -1211,6 +1298,8 @@ static void publish_registered_or_batch(grpc_call *call, int success,
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
requested_call *rc = prc;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
&rc->completion);
GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 2899c6dea3..c638d682bb 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -39,9 +39,9 @@
#include "src/core/transport/transport.h"
/* Create a server */
-grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
- size_t filter_count,
- const grpc_channel_args *args);
+grpc_server *grpc_server_create_from_filters(
+ const grpc_channel_filter **filters, size_t filter_count,
+ const grpc_channel_args *args);
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index b7390675ad..1e26c67693 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -34,7 +34,10 @@
#include <grpc/grpc.h>
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h"
+#include "src/core/channel/compress_filter.h"
grpc_server *grpc_server_create(const grpc_channel_args *args) {
- return grpc_server_create_from_filters(NULL, 0, args);
+ const grpc_channel_filter *filters[] = {&grpc_compress_filter};
+ return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
+ args);
}