diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 55 | ||||
-rw-r--r-- | src/core/surface/channel.c | 6 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 2 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 2 | ||||
-rw-r--r-- | src/core/surface/init.c | 4 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 2 | ||||
-rw-r--r-- | src/core/surface/server.c | 24 | ||||
-rw-r--r-- | src/core/surface/version.c | 2 |
8 files changed, 50 insertions, 47 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 4426bbbce9..4168c2ef0c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -61,18 +61,6 @@ - status/close recv (depending on client/server) */ #define MAX_CONCURRENT_COMPLETIONS 6 -typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; - -typedef enum { - SEND_NOTHING, - SEND_INITIAL_METADATA, - SEND_BUFFERED_INITIAL_METADATA, - SEND_MESSAGE, - SEND_BUFFERED_MESSAGE, - SEND_TRAILING_METADATA_AND_FINISH, - SEND_FINISH -} send_action; - typedef struct { grpc_ioreq_completion_func on_complete; void *user_data; @@ -433,7 +421,7 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) { if (call->allocated_completions & (1u << i)) { continue; } - call->allocated_completions |= 1u << i; + call->allocated_completions |= (gpr_uint8)(1u << i); gpr_mu_unlock(&call->completion_mu); return &call->completions[i]; } @@ -444,7 +432,8 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) { static void done_completion(void *call, grpc_cq_completion *completion) { grpc_call *c = call; gpr_mu_lock(&c->completion_mu); - c->allocated_completions &= ~(1u << (completion - c->completions)); + c->allocated_completions &= + (gpr_uint8) ~(1u << (completion - c->completions)); gpr_mu_unlock(&c->completion_mu); GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); } @@ -520,7 +509,7 @@ static void set_status_code(grpc_call *call, status_source source, if (call->status[source].is_set) return; call->status[source].is_set = 1; - call->status[source].code = status; + call->status[source].code = (grpc_status_code)status; call->error_status_set = status != GRPC_STATUS_OK; if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) { @@ -545,7 +534,7 @@ static void set_encodings_accepted_by_peer( gpr_slice_buffer accept_encoding_parts; gpr_slice_buffer_init(&accept_encoding_parts); - gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts); + gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already * zeroes the whole grpc_call */ @@ -616,7 +605,7 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; - const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + const size_t MAX_RECV_PEEK_AHEAD = 65536; size_t buffered_bytes; int cancel_alarm = 0; @@ -630,9 +619,6 @@ static void unlock(grpc_call *call) { call->cancel_alarm = 0; if (!call->receiving && need_more_data(call)) { - op.recv_ops = &call->recv_ops; - op.recv_state = &call->recv_state; - op.on_done_recv = &call->on_done_recv; if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { op.max_recv_bytes = call->incoming_message_length - call->incoming_message.length + MAX_RECV_PEEK_AHEAD; @@ -644,9 +630,16 @@ static void unlock(grpc_call *call) { op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; } } - call->receiving = 1; - GRPC_CALL_INTERNAL_REF(call, "receiving"); - start_op = 1; + /* TODO(ctiller): 1024 is basically to cover a bug + I don't understand yet */ + if (op.max_recv_bytes > 1024) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = &call->on_done_recv; + call->receiving = 1; + GRPC_CALL_INTERNAL_REF(call, "receiving"); + start_op = 1; + } } if (!call->sending) { @@ -751,7 +744,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, size_t i; /* ioreq is live: we need to do something */ master = &call->masters[master_set]; - master->complete_mask |= 1u << op; + master->complete_mask |= (gpr_uint16)(1u << op); if (!success) { master->success = 0; } @@ -1115,10 +1108,12 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { /* fall through intended */ case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + size_t length; data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; - grpc_sopb_add_begin_message( - &call->send_ops, grpc_byte_buffer_length(data.send_message), flags); + length = grpc_byte_buffer_length(data.send_message); + GPR_ASSERT(length <= GPR_UINT32_MAX); + grpc_sopb_add_begin_message(&call->send_ops, (gpr_uint32)length, flags); copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; @@ -1220,7 +1215,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, grpc_ioreq_completion_func completion, void *user_data) { size_t i; - gpr_uint32 have_ops = 0; + gpr_uint16 have_ops = 0; grpc_ioreq_op op; reqinfo_master *master; grpc_ioreq_data data; @@ -1251,13 +1246,13 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, } if (op == GRPC_IOREQ_SEND_STATUS) { set_status_code(call, STATUS_FROM_SERVER_STATUS, - reqs[i].data.send_status.code); + (gpr_uint32)reqs[i].data.send_status.code); if (reqs[i].data.send_status.details) { set_status_details(call, STATUS_FROM_SERVER_STATUS, GRPC_MDSTR_REF(reqs[i].data.send_status.details)); } } - have_ops |= 1u << op; + have_ops |= (gpr_uint16)(1u << op); call->request_data[op] = data; call->request_flags[op] = reqs[i].flags; @@ -1341,7 +1336,7 @@ static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, GPR_ASSERT(status != GRPC_STATUS_OK); - set_status_code(c, STATUS_FROM_API_OVERRIDE, status); + set_status_code(c, STATUS_FROM_API_OVERRIDE, (gpr_uint32)status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); c->cancel_with_status = status; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 586402e21c..a89523b3ab 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -113,7 +113,7 @@ grpc_channel *grpc_channel_create_from_filters( grpc_mdstr_from_string(mdctx, "grpc-message", 0); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(i, buf); + gpr_ltoa((long)i, buf); channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( mdctx, GRPC_MDSTR_REF(channel->grpc_status_string), grpc_mdstr_from_string(mdctx, buf, 0)); @@ -134,7 +134,7 @@ grpc_channel *grpc_channel_create_from_filters( gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", GRPC_ARG_MAX_MESSAGE_LENGTH); } else { - channel->max_message_length = args->args[i].value.integer; + channel->max_message_length = (gpr_uint32)args->args[i].value.integer; } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { if (args->args[i].type != GRPC_ARG_STRING) { @@ -193,7 +193,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { grpc_mdelem *send_metadata[2]; - int num_metadata = 0; + size_t num_metadata = 0; GPR_ASSERT(channel->is_client); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 707251da89..9e2cf1cf66 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -164,7 +164,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_resolver *resolver; subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); - int n = 0; + size_t n = 0; GPR_ASSERT(!reserved); if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 8de024aaea..74dc09e36e 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H #define GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H -/* Internal API for completion channels */ +/* Internal API for completion queues */ #include "src/core/iomgr/pollset.h" #include <grpc/grpc.h> diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 0d48cd42d7..03bd026a42 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -40,6 +40,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/time.h> #include "src/core/channel/channel_stack.h" +#include "src/core/client_config/lb_policy_registry.h" +#include "src/core/client_config/lb_policies/pick_first.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" @@ -85,6 +87,8 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); + grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create()); + grpc_register_lb_policy(grpc_pick_first_lb_factory_create()); grpc_resolver_registry_init("dns:///"); grpc_register_resolver_type(grpc_dns_resolver_factory_create()); grpc_register_resolver_type(grpc_ipv4_resolver_factory_create()); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 35b60bdbef..9b554eeb70 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -197,7 +197,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, subchannel_factory *f; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; - int n = 0; + size_t n = 0; GPR_ASSERT(reserved == NULL); if (grpc_find_security_connector_in_args(args) != NULL) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 292bf6fab8..3d404f78a4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -33,6 +33,7 @@ #include "src/core/surface/server.h" +#include <limits.h> #include <stdlib.h> #include <string.h> @@ -203,7 +204,7 @@ struct grpc_server { gpr_stack_lockfree *request_freelist; /** requested call backing data */ requested_call *requested_calls; - int max_requested_calls; + size_t max_requested_calls; gpr_atm shutdown_flag; gpr_uint8 shutdown_published; @@ -298,7 +299,7 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, */ static void request_matcher_init(request_matcher *request_matcher, - int entries) { + size_t entries) { memset(request_matcher, 0, sizeof(*request_matcher)); request_matcher->requests = gpr_stack_lockfree_create(entries); } @@ -804,7 +805,7 @@ grpc_server *grpc_server_create_from_filters( server->request_freelist = gpr_stack_lockfree_create(server->max_requested_calls); for (i = 0; i < (size_t)server->max_requested_calls; i++) { - gpr_stack_lockfree_push(server->request_freelist, i); + gpr_stack_lockfree_push(server->request_freelist, (int)i); } request_matcher_init(&server->unregistered_request_matcher, server->max_requested_calls); @@ -817,7 +818,7 @@ grpc_server *grpc_server_create_from_filters( grpc_server_census_filter (optional) - for stats collection and tracing {passed in filter stack} grpc_connected_channel_filter - for interfacing with transports */ - server->channel_filter_count = filter_count + 1 + census_enabled; + server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u); server->channel_filters = gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *)); server->channel_filters[0] = &server_surface_filter; @@ -825,7 +826,7 @@ grpc_server *grpc_server_create_from_filters( server->channel_filters[1] = &grpc_server_census_filter; } for (i = 0; i < filter_count; i++) { - server->channel_filters[i + 1 + census_enabled] = filters[i]; + server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i]; } server->channel_args = grpc_channel_args_copy(args); @@ -896,7 +897,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_mdstr *host; grpc_mdstr *method; gpr_uint32 hash; - gpr_uint32 slots; + size_t slots; gpr_uint32 probes; gpr_uint32 max_probes = 0; grpc_transport_op op; @@ -949,7 +950,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, crm->host = host; crm->method = method; } - chand->registered_method_slots = slots; + GPR_ASSERT(slots <= GPR_UINT32_MAX); + chand->registered_method_slots = (gpr_uint32)slots; chand->registered_method_max_probes = max_probes; } @@ -970,7 +972,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; - op.disconnect = gpr_atm_acq_load(&s->shutdown_flag); + op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0; grpc_transport_perform_op(transport, &op); } @@ -1246,7 +1248,8 @@ static void begin_call(grpc_server *server, call_data *calld, } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc); + grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req), + publish, rc); } static void done_request_event(void *req, grpc_cq_completion *c) { @@ -1255,8 +1258,9 @@ static void done_request_event(void *req, grpc_cq_completion *c) { if (rc >= server->requested_calls && rc < server->requested_calls + server->max_requested_calls) { + GPR_ASSERT(rc - server->requested_calls <= INT_MAX); gpr_stack_lockfree_push(server->request_freelist, - rc - server->requested_calls); + (int)(rc - server->requested_calls)); } else { gpr_free(req); } diff --git a/src/core/surface/version.c b/src/core/surface/version.c index d7aaba3868..4b90e06a04 100644 --- a/src/core/surface/version.c +++ b/src/core/surface/version.c @@ -37,5 +37,5 @@ #include <grpc/grpc.h> const char *grpc_version_string(void) { - return "0.10.1.0"; + return "0.11.0.0"; } |