diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 3 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 16 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 8 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack_builder.h | 8 | ||||
-rw-r--r-- | src/core/lib/channel/context.h | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.c | 25 | ||||
-rw-r--r-- | src/core/lib/security/context/security_context.h | 8 | ||||
-rw-r--r-- | src/core/lib/surface/channel.h | 8 | ||||
-rw-r--r-- | src/core/lib/surface/channel_init.h | 8 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 9 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.h | 6 | ||||
-rw-r--r-- | src/core/lib/transport/metadata.h | 8 | ||||
-rw-r--r-- | src/core/lib/transport/metadata_batch.c | 3 | ||||
-rw-r--r-- | src/core/lib/transport/metadata_batch.h | 12 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 10 |
15 files changed, 118 insertions, 25 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index dec25efe61..af913d8a9d 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -767,6 +767,9 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(3, GPR_TIMESPAN)); + /* Note the following LB call progresses every time there's activity in \a + * glb_policy->base.interested_parties, which is comprised of the polling + * entities passed to glb_pick(). */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, "/BalanceLoad", diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index f2f5465201..0e8dfb7d96 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -94,7 +94,8 @@ static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error); + grpc_chttp2_transport *t, grpc_error *error, + const char *reason); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -876,7 +877,7 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "start_writing:nothing_to_write"); } - end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write")); + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE, "Nothing to write"); if (t->ep && !t->endpoint_reading) { destroy_endpoint(exec_ctx, t); } @@ -925,11 +926,18 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } +/* error may be GRPC_ERROR_NONE if there is no error allocated yet. + In that case, use "reason" as the text for a new error. */ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error) { + grpc_chttp2_transport *t, grpc_error *error, + const char *reason) { grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { + if (error == GRPC_ERROR_NONE && reason != NULL) { + /* create error object. */ + error = GRPC_ERROR_CREATE(reason); + } fail_pending_writes(exec_ctx, &t->global, stream_global, GRPC_ERROR_REF(error)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); @@ -951,7 +959,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - end_waiting_for_write(exec_ctx, t, error); + end_waiting_for_write(exec_ctx, t, error, NULL); switch (t->executor.write_state) { case GRPC_CHTTP2_WRITING_INACTIVE: diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index ee68701456..6b73cce380 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -51,6 +51,10 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/transport.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; @@ -291,4 +295,8 @@ extern int grpc_trace_channel; #define GRPC_CALL_LOG_OP(sev, elem, op) \ if (grpc_trace_channel) grpc_call_log_op(sev, elem, op) +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H */ diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index 0e6bfd9aa6..4a00f7bfdb 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -39,6 +39,10 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#ifdef __cplusplus +extern "C" { +#endif + /// grpc_channel_stack_builder offers a programmatic interface to selected /// and order channel filters typedef struct grpc_channel_stack_builder grpc_channel_stack_builder; @@ -158,4 +162,8 @@ void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder); extern int grpc_trace_channel_stack_builder; +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */ diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index c50e84279d..071c5f695c 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -34,10 +34,19 @@ #ifndef GRPC_CORE_LIB_CHANNEL_CONTEXT_H #define GRPC_CORE_LIB_CHANNEL_CONTEXT_H -/* Call object context pointers */ +/// Call object context pointers. + +/// Call context is represented as an array of \a grpc_call_context_elements. +/// This enum represents the indexes into the array, where each index +/// contains a different type of value. typedef enum { + /// Value is either a \a grpc_client_security_context or a + /// \a grpc_server_security_context. GRPC_CONTEXT_SECURITY = 0, + + /// Value is a \a census_context. GRPC_CONTEXT_TRACING, + GRPC_CONTEXT_COUNT } grpc_context_index; diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 38ebd2dbcb..ea8d241154 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -90,10 +90,12 @@ struct grpc_tcp_listener { grpc_closure read_closure; grpc_closure destroyed_closure; struct grpc_tcp_listener *next; - /* When we add a listener, more than one can be created, mainly because of - IPv6. A sibling will still be in the normal list, but will be flagged - as such. Any action, such as ref or unref, will affect all of the - siblings in the list. */ + /* sibling is a linked list of all listeners for a given port. add_port and + clone_port place all new listeners in the same sibling list. A member of + the 'sibling' list is also a member of the 'next' list. The head of each + sibling list has is_sibling==0, and subsequent members of sibling lists + have is_sibling==1. is_sibling allows separate sibling lists to be + identified while iterating through 'next'. */ struct grpc_tcp_listener *sibling; int is_sibling; }; @@ -306,7 +308,7 @@ static grpc_error *prepare_socket(int fd, const struct sockaddr *addr, GPR_ASSERT(fd >= 0); - if (so_reuseport) { + if (so_reuseport && !grpc_is_unix_socket(addr)) { err = grpc_set_socket_reuse_port(fd, 1); if (err != GRPC_ERROR_NONE) goto error; } @@ -480,6 +482,9 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, return err; } +/* Insert count new listeners after listener. Every new listener will have the + same listen address as listener (SO_REUSEPORT must be enabled). Every new + listener is a sibling of listener. */ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { grpc_tcp_listener *sp = NULL; char *addr_str; @@ -506,6 +511,11 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { sp = gpr_malloc(sizeof(grpc_tcp_listener)); sp->next = listener->next; listener->next = sp; + /* sp (the new listener) is a sibling of 'listener' (the original + listener). */ + sp->is_sibling = 1; + sp->sibling = listener->sibling; + listener->sibling = sp; sp->server = listener->server; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); @@ -514,8 +524,6 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { sp->port = port; sp->port_index = listener->port_index; sp->fd_index = listener->fd_index + count - i; - sp->is_sibling = 1; - sp->sibling = listener->is_sibling ? listener->sibling : listener; GPR_ASSERT(sp->emfd); while (listener->server->tail->next != NULL) { listener->server->tail = listener->server->tail->next; @@ -685,7 +693,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, s->pollset_count = pollset_count; sp = s->head; while (sp != NULL) { - if (s->so_reuseport && pollset_count > 1) { + if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr.sockaddr) && + pollset_count > 1) { GPR_ASSERT(GRPC_LOG_IF_ERROR( "clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); for (i = 0; i < pollset_count; i++) { diff --git a/src/core/lib/security/context/security_context.h b/src/core/lib/security/context/security_context.h index ef0c06b1fb..4e7666dfe3 100644 --- a/src/core/lib/security/context/security_context.h +++ b/src/core/lib/security/context/security_context.h @@ -37,6 +37,10 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/security/credentials/credentials.h" +#ifdef __cplusplus +extern "C" { +#endif + /* --- grpc_auth_context --- High level authentication context object. Can optionally be chained. */ @@ -111,4 +115,8 @@ grpc_auth_context *grpc_auth_context_from_arg(const grpc_arg *arg); grpc_auth_context *grpc_find_auth_context_in_args( const grpc_channel_args *args); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SECURITY_CONTEXT_SECURITY_CONTEXT_H */ diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 7eff7b8883..4c62974346 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -42,6 +42,14 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, grpc_channel_stack_type channel_stack_type, grpc_transport *optional_transport); +/** Create a call given a grpc_channel, in order to call \a method. + Progress is tied to activity on \a pollset_set. The returned call object is + meant to be used with \a grpc_call_start_batch_and_execute, which relies on + callbacks to signal completions. \a method and \a host need + only live through the invocation of this function. If \a parent_call is + non-NULL, it must be a server-side call. It will be used to propagate + properties from the server call to this new client call, depending on the + value of \a propagation_mask (see propagation_bits.h for possible values) */ grpc_call *grpc_channel_create_pollset_set_call( grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, const char *method, const char *host, diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h index 3a18a61ddb..b53f2aefb9 100644 --- a/src/core/lib/surface/channel_init.h +++ b/src/core/lib/surface/channel_init.h @@ -40,6 +40,10 @@ #define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000 +#ifdef __cplusplus +extern "C" { +#endif + /// This module provides a way for plugins (and the grpc core library itself) /// to register mutators for channel stacks. /// It also provides a universal entry path to run those mutators to build @@ -84,4 +88,8 @@ bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, grpc_channel_stack_type type); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_INIT_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 1d61ec3bbb..64afcecc07 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -272,7 +272,7 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - int send_goaway, grpc_error *send_disconnect) { + bool send_goaway, grpc_error *send_disconnect) { grpc_transport_op op; struct shutdown_cleanup_args *sc; grpc_channel_element *elem; @@ -293,7 +293,7 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, channel_broadcaster *cb, - int send_goaway, + bool send_goaway, grpc_error *force_disconnect) { size_t i; @@ -1252,7 +1252,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); } - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 1, 0); + channel_broadcaster_shutdown(&exec_ctx, &broadcaster, true /* send_goaway */, + GRPC_ERROR_NONE); done: grpc_exec_ctx_finish(&exec_ctx); @@ -1268,7 +1269,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, + channel_broadcaster_shutdown(&exec_ctx, &broadcaster, false /* send_goaway */, GRPC_ERROR_CREATE("Cancelling all calls")); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 95519a9eaf..e64dce6283 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -59,13 +59,9 @@ struct grpc_byte_stream { * on_complete will not be called), 0 if the bytes will be available * asynchronously. * - * on entry, *remaining can be set as a hint as to the maximum number + * max_size_hint can be set as a hint as to the maximum number * of bytes that would be acceptable to read. * - * fills *buffer, *length, *remaining with the bytes, length of bytes - * and length of data remaining to be read before either returning 1 - * or calling on_complete. - * * once a slice is returned into *slice, it is owned by the caller. */ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index 6d82f4d681..2b0921c8d7 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -37,6 +37,10 @@ #include <grpc/support/slice.h> #include <grpc/support/useful.h> +#ifdef __cplusplus +extern "C" { +#endif + /* This file provides a mechanism for tracking metadata through the grpc stack. It's not intended for consumption outside of the library. @@ -164,4 +168,8 @@ void grpc_mdctx_global_shutdown(void); extern gpr_slice (*grpc_chttp2_base64_encode_and_huffman_compress)( gpr_slice input); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_TRANSPORT_METADATA_H */ diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index e4398abeb7..84b5a74d51 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -33,6 +33,7 @@ #include "src/core/lib/transport/metadata_batch.h" +#include <stdbool.h> #include <string.h> #include <grpc/support/alloc.h> @@ -187,7 +188,7 @@ void grpc_metadata_batch_clear(grpc_metadata_batch *batch) { grpc_metadata_batch_filter(batch, no_metadata_for_you, NULL); } -int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { +bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { return batch->list.head == NULL && gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type), batch->deadline) == 0; diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 7af823f7ca..0424b4db98 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -34,12 +34,18 @@ #ifndef GRPC_CORE_LIB_TRANSPORT_METADATA_BATCH_H #define GRPC_CORE_LIB_TRANSPORT_METADATA_BATCH_H +#include <stdbool.h> + #include <grpc/grpc.h> #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> #include <grpc/support/time.h> #include "src/core/lib/transport/metadata.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; @@ -64,7 +70,7 @@ typedef struct grpc_metadata_batch { void grpc_metadata_batch_init(grpc_metadata_batch *batch); void grpc_metadata_batch_destroy(grpc_metadata_batch *batch); void grpc_metadata_batch_clear(grpc_metadata_batch *batch); -int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch); +bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch); /* Returns the transport size of the batch. */ size_t grpc_metadata_batch_size(grpc_metadata_batch *batch); @@ -125,4 +131,8 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); } while (0) #endif +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_TRANSPORT_METADATA_BATCH_H */ diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index e33fc5c761..1705e6e582 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -43,6 +43,10 @@ #include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/metadata_batch.h" +#ifdef __cplusplus +extern "C" { +#endif + /* forward declarations */ typedef struct grpc_transport grpc_transport; @@ -158,7 +162,7 @@ typedef struct grpc_transport_op { /** should we send a goaway? after a goaway is sent, once there are no more active calls on the transport, the transport should disconnect */ - int send_goaway; + bool send_goaway; /** what should the goaway contain? */ grpc_status_code goaway_status; gpr_slice *goaway_message; @@ -268,4 +272,8 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *transport); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ |