From f3756c1e0d09a90173851a8e8ab4f6342ae8de45 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 1 Jul 2015 17:21:01 -0700 Subject: Introduce multiple clocks to GPR --- src/core/surface/call.c | 2 +- src/core/surface/completion_queue.c | 2 +- src/core/surface/init.c | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 181617fff8..82bd1b2de2 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1190,7 +1190,7 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now(GPR_CLOCK_REALTIME)); } /* we offset status by a small amount when storing it into transport metadata diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 030a8b4e6f..3c074da652 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -325,7 +325,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_kick(&cc->pollset); grpc_pollset_work(&cc->pollset, - gpr_time_add(gpr_now(), gpr_time_from_millis(100))); + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } diff --git a/src/core/surface/init.c b/src/core/surface/init.c index ca61a38a35..db896934a3 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -33,6 +33,7 @@ #include #include +#include #include "src/core/channel/channel_stack.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/iomgr.h" @@ -56,6 +57,7 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { + gpr_time_init(); grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("surface", &grpc_surface_trace); grpc_register_tracer("http", &grpc_http_trace); -- cgit v1.2.3 From f1bff016319861348e6a0460954c35634ea452b2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 6 Jul 2015 11:20:50 -0700 Subject: clang-format changed files --- include/grpc/support/time.h | 10 ++++----- src/core/channel/census_filter.c | 28 ++++++++++++++++++-------- src/core/channel/client_setup.c | 5 +++-- src/core/iomgr/iomgr.c | 3 ++- src/core/iomgr/pollset_posix.c | 7 ++++--- src/core/iomgr/pollset_windows.c | 4 +--- src/core/iomgr/tcp_client_posix.c | 3 ++- src/core/iomgr/tcp_client_windows.c | 3 ++- src/core/security/credentials.c | 14 ++++++++----- src/core/security/google_default_credentials.c | 7 ++++--- src/core/security/json_token.c | 5 ++--- src/core/statistics/census_tracing.c | 12 +++++------ src/core/statistics/window_stats.h | 2 +- src/core/support/cancellable.c | 4 ++-- src/core/support/time_posix.c | 7 ++----- src/core/support/time_win32.c | 3 ++- src/core/surface/call.c | 20 +++++++++--------- src/core/surface/completion_queue.c | 14 ++++++------- src/core/transport/chttp2/parsing.c | 3 ++- src/core/transport/chttp2/stream_encoder.c | 3 ++- test/core/end2end/cq_verifier.c | 3 ++- test/core/fling/server.c | 3 ++- test/core/httpcli/httpcli_test.c | 3 ++- test/core/iomgr/alarm_list_test.c | 14 ++++++------- test/core/iomgr/tcp_client_posix_test.c | 3 ++- test/core/support/cancellable_test.c | 26 +++++++++++++----------- test/core/support/sync_test.c | 12 ++++++----- test/core/util/test_config.h | 8 ++++---- 28 files changed, 128 insertions(+), 101 deletions(-) (limited to 'src/core/surface') diff --git a/include/grpc/support/time.h b/include/grpc/support/time.h index fb8371bb46..a5c947dfc8 100644 --- a/include/grpc/support/time.h +++ b/include/grpc/support/time.h @@ -46,8 +46,8 @@ extern "C" { #endif typedef struct gpr_timespec { - time_t tv_sec; - int tv_nsec; + time_t tv_sec; + int tv_nsec; } gpr_timespec; /* Time constants. */ @@ -66,8 +66,8 @@ extern const gpr_timespec gpr_inf_past; /* The far past. */ typedef enum { /* Monotonic clock. Epoch undefined. Always moves forwards. */ GPR_CLOCK_MONOTONIC = 0, - /* Realtime clock. May jump forwards or backwards. Settable by - the system administrator. Has its epoch at 0:00:00 UTC 1 Jan 1970. */ + /* Realtime clock. May jump forwards or backwards. Settable by + the system administrator. Has its epoch at 0:00:00 UTC 1 Jan 1970. */ GPR_CLOCK_REALTIME } gpr_clock_type; @@ -112,4 +112,4 @@ double gpr_timespec_to_micros(gpr_timespec t); } #endif -#endif /* GRPC_SUPPORT_TIME_H */ +#endif /* GRPC_SUPPORT_TIME_H */ diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index 0aa3cc3710..481e4eaf48 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -175,8 +175,8 @@ static void server_init_call_elem(grpc_call_element* elem, static void server_destroy_call_elem(grpc_call_element* elem) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); - d->stats.elapsed_time_ms = - gpr_timespec_to_micros(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), d->start_ts)); + d->stats.elapsed_time_ms = gpr_timespec_to_micros( + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), d->start_ts)); census_record_rpc_server_stats(d->op_id, &d->stats); census_tracing_end_op(d->op_id); } @@ -200,11 +200,23 @@ static void destroy_channel_elem(grpc_channel_element* elem) { } const grpc_channel_filter grpc_client_census_filter = { - client_start_transport_op, channel_op, sizeof(call_data), - client_init_call_elem, client_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "census-client"}; + client_start_transport_op, + channel_op, + sizeof(call_data), + client_init_call_elem, + client_destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_start_transport_op, channel_op, sizeof(call_data), - server_init_call_elem, server_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "census-server"}; + server_start_transport_op, + channel_op, + sizeof(call_data), + server_init_call_elem, + server_destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "census-server"}; diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index 9844f4e43e..3663130085 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -94,7 +94,8 @@ static void setup_initiate(grpc_transport_setup *sp) { int in_alarm = 0; r->setup = s; - r->deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(60)); + r->deadline = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(60)); gpr_mu_lock(&s->mu); GPR_ASSERT(s->refs > 0); @@ -231,7 +232,7 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r, return result; } -static void backoff_alarm_done(void *arg /* grpc_client_setup_request */, +static void backoff_alarm_done(void *arg /* grpc_client_setup_request */, int success) { grpc_client_setup_request *r = arg; grpc_client_setup *s = r->setup; diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 39ba4f5264..6954ddbf76 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -67,7 +67,8 @@ static void background_callback_executor(void *ignored) { gpr_mu_unlock(&g_mu); closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); - } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME), &deadline)) { + } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME), + &deadline)) { } else { gpr_mu_unlock(&g_mu); gpr_sleep_until(gpr_time_min(short_deadline, deadline)); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 852f2f4740..205b1f2dbb 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -187,15 +187,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } -int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now) { gpr_timespec timeout; static const int max_spin_polling_us = 10; if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { return -1; } if (gpr_time_cmp( - deadline, - gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { + deadline, + gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { return 0; } timeout = gpr_time_sub(deadline, now); diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index c0573030e0..24226cc980 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -86,8 +86,6 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { return 1 /* GPR_TRUE */; } -void grpc_pollset_kick(grpc_pollset *p) { - gpr_cv_signal(&p->cv); -} +void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); } #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index e0b0fd6e53..76bb15d26a 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -246,7 +246,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb_arg = ac; gpr_mu_lock(&ac->mu); - grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, + gpr_now(GPR_CLOCK_REALTIME)); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 2ae08f2435..16741452b9 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -215,7 +215,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), ac->refs = 2; ac->aborted = 0; - grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, + gpr_now(GPR_CLOCK_REALTIME)); socket->write_info.outstanding = 1; grpc_socket_notify_on_write(socket, on_connect, ac); return; diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 4481554d74..53643c7731 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -384,7 +384,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds, if (c->cached.service_url != NULL && strcmp(c->cached.service_url, service_url) == 0 && c->cached.jwt_md != NULL && - (gpr_time_cmp(gpr_time_sub(c->cached.jwt_expiration, gpr_now(GPR_CLOCK_REALTIME)), + (gpr_time_cmp(gpr_time_sub(c->cached.jwt_expiration, + gpr_now(GPR_CLOCK_REALTIME)), refresh_threshold) > 0)) { jwt_md = grpc_credentials_md_store_ref(c->cached.jwt_md); } @@ -401,7 +402,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds, char *md_value; gpr_asprintf(&md_value, "Bearer %s", jwt); gpr_free(jwt); - c->cached.jwt_expiration = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c->jwt_lifetime); + c->cached.jwt_expiration = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c->jwt_lifetime); c->cached.service_url = gpr_strdup(service_url); c->cached.jwt_md = grpc_credentials_md_store_create(1); grpc_credentials_md_store_add_cstrings( @@ -586,7 +588,8 @@ static void on_oauth2_token_fetcher_http_response( status = grpc_oauth2_token_fetcher_credentials_parse_server_response( response, &c->access_token_md, &token_lifetime); if (status == GRPC_CREDENTIALS_OK) { - c->token_expiration = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime); + c->token_expiration = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime); r->cb(r->user_data, c->access_token_md->entries, c->access_token_md->num_entries, status); } else { @@ -608,8 +611,9 @@ static void oauth2_token_fetcher_get_request_metadata( { gpr_mu_lock(&c->mu); if (c->access_token_md != NULL && - (gpr_time_cmp(gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)), - refresh_threshold) > 0)) { + (gpr_time_cmp( + gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)), + refresh_threshold) > 0)) { cached_access_token_md = grpc_credentials_md_store_ref(c->access_token_md); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 93e609456e..e207f66aed 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -104,9 +104,10 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); - grpc_httpcli_get(&context, &detector.pollset, &request, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), - on_compute_engine_detection_http_response, &detector); + grpc_httpcli_get( + &context, &detector.pollset, &request, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), + on_compute_engine_detection_http_response, &detector); /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c index 3c411d76cf..a680360eb5 100644 --- a/src/core/security/json_token.c +++ b/src/core/security/json_token.c @@ -218,8 +218,8 @@ static char *encoded_jwt_claim(const grpc_auth_json_key *json_key, gpr_ltoa(now.tv_sec, now_str); gpr_ltoa(expiration.tv_sec, expiration_str); - child = create_child(NULL, json, "iss", json_key->client_email, - GRPC_JSON_STRING); + child = + create_child(NULL, json, "iss", json_key->client_email, GRPC_JSON_STRING); if (scope != NULL) { child = create_child(child, json, "scope", scope, GRPC_JSON_STRING); } else { @@ -396,4 +396,3 @@ void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token *refresh_token) { refresh_token->refresh_token = NULL; } } - diff --git a/src/core/statistics/census_tracing.c b/src/core/statistics/census_tracing.c index 3b31a02cb3..3036ba5407 100644 --- a/src/core/statistics/census_tracing.c +++ b/src/core/statistics/census_tracing.c @@ -143,8 +143,8 @@ void census_tracing_end_op(census_op_id op_id) { gpr_mu_lock(&g_mu); trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); if (trace != NULL) { - trace->rpc_stats.elapsed_time_ms = - gpr_timespec_to_micros(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), trace->ts)); + trace->rpc_stats.elapsed_time_ms = gpr_timespec_to_micros( + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), trace->ts)); gpr_log(GPR_DEBUG, "End tracing for id %lu, method %s, latency %f us", op_id_2_uint64(&op_id), trace->method, trace->rpc_stats.elapsed_time_ms); @@ -194,8 +194,8 @@ const char* census_get_trace_method_name(const census_trace_obj* trace) { static census_trace_annotation* dup_annotation_chain( census_trace_annotation* from) { - census_trace_annotation *ret = NULL; - census_trace_annotation **to = &ret; + census_trace_annotation* ret = NULL; + census_trace_annotation** to = &ret; for (; from != NULL; from = from->next) { *to = gpr_malloc(sizeof(census_trace_annotation)); memcpy(*to, from, sizeof(census_trace_annotation)); @@ -223,9 +223,9 @@ census_trace_obj** census_get_active_ops(int* num_active_ops) { size_t n = 0; census_ht_kv* all_kvs = census_ht_get_all_elements(g_trace_store, &n); *num_active_ops = (int)n; - if (n != 0 ) { + if (n != 0) { size_t i = 0; - ret = gpr_malloc(sizeof(census_trace_obj *) * n); + ret = gpr_malloc(sizeof(census_trace_obj*) * n); for (i = 0; i < n; i++) { ret[i] = trace_obj_dup((census_trace_obj*)all_kvs[i].v); } diff --git a/src/core/statistics/window_stats.h b/src/core/statistics/window_stats.h index 9651198c71..0020f6b44c 100644 --- a/src/core/statistics/window_stats.h +++ b/src/core/statistics/window_stats.h @@ -170,4 +170,4 @@ void census_window_stats_get_sums(const struct census_window_stats* wstats, assertion failure). This function is thread-compatible. */ void census_window_stats_destroy(struct census_window_stats* wstats); -#endif /* GRPC_INTERNAL_CORE_STATISTICS_WINDOW_STATS_H */ +#endif /* GRPC_INTERNAL_CORE_STATISTICS_WINDOW_STATS_H */ diff --git a/src/core/support/cancellable.c b/src/core/support/cancellable.c index 4f8f237f40..3cbb318ab6 100644 --- a/src/core/support/cancellable.c +++ b/src/core/support/cancellable.c @@ -121,8 +121,8 @@ void gpr_cancellable_cancel(gpr_cancellable *c) { } else { gpr_event ev; gpr_event_init(&ev); - gpr_event_wait(&ev, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000))); + gpr_event_wait(&ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000))); } } } while (failures != 0); diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index d5797e0db2..f9b7958783 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -56,10 +56,7 @@ static gpr_timespec gpr_from_timespec(struct timespec ts) { } /** maps gpr_clock_type --> clockid_t for clock_gettime */ -static clockid_t clockid_for_gpr_clock[] = { - CLOCK_MONOTONIC, - CLOCK_REALTIME -}; +static clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC, CLOCK_REALTIME}; void gpr_time_init(void) {} @@ -79,7 +76,7 @@ static double g_time_scale; static uint64_t g_time_start; void gpr_time_init(void) { - mach_timebase_info_data_t tb = { 0, 1 }; + mach_timebase_info_data_t tb = {0, 1}; mach_timebase_info(&tb); g_time_scale = tb.numer; g_time_scale /= tb.denom; diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index 827abc741d..d015133561 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -64,7 +64,8 @@ void gpr_sleep_until(gpr_timespec until) { } delta = gpr_time_sub(until, now); - sleep_millis = (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; + sleep_millis = + (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; Sleep(sleep_millis); } } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 82bd1b2de2..b86df19cb5 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -464,8 +464,7 @@ static int need_more_data(grpc_call *call) { (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK) || - call->destroy_called; + (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called; } static void unlock(grpc_call *call) { @@ -1155,7 +1154,8 @@ static void execute_op(grpc_call *call, grpc_transport_op *op) { } else { finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); args->call = call; - grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, + args); op->on_consumed = &args->closure; } } @@ -1190,7 +1190,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, + gpr_now(GPR_CLOCK_REALTIME)); } /* we offset status by a small amount when storing it into transport metadata @@ -1229,13 +1230,13 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { } else { gpr_uint32 parsed_clevel_bytes; if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { + GPR_SLICE_LENGTH(md->value->slice), + &parsed_clevel_bytes)) { /* the following cast is safe, as a gpr_uint32 should be able to hold all * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level) parsed_clevel_bytes; + clevel = (grpc_compression_level)parsed_clevel_bytes; } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ } grpc_mdelem_set_user_data(md, destroy_compression, (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); @@ -1258,7 +1259,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { + } else if (key == + grpc_channel_get_compresssion_level_string(call->channel)) { set_decode_compression_level(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 3c074da652..c3f209667f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -90,9 +90,8 @@ grpc_completion_queue *grpc_completion_queue_create(void) { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -107,9 +106,8 @@ static void on_pollset_destroy_done(void *arg) { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif @@ -324,8 +322,8 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_kick(&cc->pollset); - grpc_pollset_work(&cc->pollset, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100))); + grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(100))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index a11b7e6d36..51278406f5 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -205,7 +205,8 @@ void grpc_chttp2_publish_reads( } if (stream_parsing->saw_rst_stream) { stream_global->cancelled = 1; - stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status(stream_parsing->rst_stream_reason); + stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status( + stream_parsing->rst_stream_reason); if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) { stream_global->published_cancelled = 1; } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index db642961ec..fb1002861a 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -437,7 +437,8 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, framer_state *st) { char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_mdelem *mdelem; - grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME)), timeout_str); + grpc_chttp2_encode_timeout( + gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME)), timeout_str); mdelem = grpc_mdelem_from_metadata_strings( c->mdctx, grpc_mdstr_ref(c->timeout_key_str), grpc_mdstr_from_string(c->mdctx, timeout_str)); diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index c290cfdbad..407c72b519 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -248,7 +248,8 @@ void cq_verify(cq_verifier *v) { } void cq_verify_empty(cq_verifier *v) { - gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(1)); + gpr_timespec deadline = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(1)); grpc_event ev; GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty"); diff --git a/test/core/fling/server.c b/test/core/fling/server.c index ac02407060..468013c3ec 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -241,7 +241,8 @@ int main(int argc, char **argv) { shutdown_started = 1; } ev = grpc_completion_queue_next( - cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000))); + cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000))); s = ev.tag; switch (ev.type) { case GRPC_OP_COMPLETE: diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index 9e76a76666..ca0b2d1519 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -145,7 +145,8 @@ int main(int argc, char **argv) { gpr_free(args[0]); gpr_free(args[2]); - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5))); + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5))); grpc_test_init(argc, argv); grpc_init(); diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c index 461c268a23..225c449d4b 100644 --- a/test/core/iomgr/alarm_list_test.c +++ b/test/core/iomgr/alarm_list_test.c @@ -61,13 +61,13 @@ static void add_test(void) { /* 10 ms alarms. will expire in the current epoch */ for (i = 0; i < 10; i++) { grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(10)), - cb, (void *)(gpr_intptr) i, start); + cb, (void *)(gpr_intptr)i, start); } /* 1010 ms alarms. will expire in the next epoch */ for (i = 10; i < 20; i++) { grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(1010)), - cb, (void *)(gpr_intptr) i, start); + cb, (void *)(gpr_intptr)i, start); } /* collect alarms. Only the first batch should be ready. */ @@ -115,15 +115,15 @@ void destruction_test(void) { memset(cb_called, 0, sizeof(cb_called)); grpc_alarm_init(&alarms[0], gpr_time_from_millis(100), cb, - (void *)(gpr_intptr) 0, gpr_time_0); + (void *)(gpr_intptr)0, gpr_time_0); grpc_alarm_init(&alarms[1], gpr_time_from_millis(3), cb, - (void *)(gpr_intptr) 1, gpr_time_0); + (void *)(gpr_intptr)1, gpr_time_0); grpc_alarm_init(&alarms[2], gpr_time_from_millis(100), cb, - (void *)(gpr_intptr) 2, gpr_time_0); + (void *)(gpr_intptr)2, gpr_time_0); grpc_alarm_init(&alarms[3], gpr_time_from_millis(3), cb, - (void *)(gpr_intptr) 3, gpr_time_0); + (void *)(gpr_intptr)3, gpr_time_0); grpc_alarm_init(&alarms[4], gpr_time_from_millis(1), cb, - (void *)(gpr_intptr) 4, gpr_time_0); + (void *)(gpr_intptr)4, gpr_time_0); GPR_ASSERT(1 == grpc_alarm_check(NULL, gpr_time_from_millis(2), NULL)); GPR_ASSERT(1 == cb_called[4][1]); grpc_alarm_cancel(&alarms[0]); diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 38f043339c..710cd725df 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -188,7 +188,8 @@ void test_times_out(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(2)), gpr_now(GPR_CLOCK_REALTIME)) > 0) { - int is_after_deadline = gpr_time_cmp(connect_deadline, gpr_now(GPR_CLOCK_REALTIME)) <= 0; + int is_after_deadline = + gpr_time_cmp(connect_deadline, gpr_now(GPR_CLOCK_REALTIME)) <= 0; if (is_after_deadline && gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(1)), gpr_now(GPR_CLOCK_REALTIME)) > 0) { diff --git a/test/core/support/cancellable_test.c b/test/core/support/cancellable_test.c index e5ed77c7ca..2f4b67a785 100644 --- a/test/core/support/cancellable_test.c +++ b/test/core/support/cancellable_test.c @@ -82,8 +82,9 @@ static void test(void) { /* Test timeout on event wait for uncancelled gpr_cancellable */ interval = gpr_now(GPR_CLOCK_REALTIME); - gpr_event_cancellable_wait( - &t.ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000)), &t.cancel); + gpr_event_cancellable_wait(&t.ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000)), + &t.cancel); interval = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), interval); GPR_ASSERT(gpr_time_cmp(interval, gpr_time_from_micros(500000)) >= 0); GPR_ASSERT(gpr_time_cmp(gpr_time_from_micros(2000000), interval) >= 0); @@ -92,9 +93,9 @@ static void test(void) { gpr_mu_lock(&t.mu); interval = gpr_now(GPR_CLOCK_REALTIME); while (!gpr_cv_cancellable_wait( - &t.cv, &t.mu, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000)), - &t.cancel)) { + &t.cv, &t.mu, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000)), + &t.cancel)) { } interval = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), interval); GPR_ASSERT(gpr_time_cmp(interval, gpr_time_from_micros(500000)) >= 0); @@ -112,8 +113,8 @@ static void test(void) { /* Wait a second, and check that no threads have finished waiting. */ gpr_mu_lock(&t.mu); - gpr_cv_wait(&t.cv, &t.mu, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000))); + gpr_cv_wait(&t.cv, &t.mu, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000))); GPR_ASSERT(t.n == n); gpr_mu_unlock(&t.mu); @@ -131,9 +132,9 @@ static void test(void) { gpr_mu_lock(&t.mu); interval = gpr_now(GPR_CLOCK_REALTIME); while (!gpr_cv_cancellable_wait( - &t.cv, &t.mu, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000)), - &t.cancel)) { + &t.cv, &t.mu, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000)), + &t.cancel)) { } interval = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), interval); GPR_ASSERT(gpr_time_cmp(gpr_time_from_micros(100000), interval) >= 0); @@ -141,8 +142,9 @@ static void test(void) { /* Test timeout on event wait for cancelled gpr_cancellable */ interval = gpr_now(GPR_CLOCK_REALTIME); - gpr_event_cancellable_wait( - &t.ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000)), &t.cancel); + gpr_event_cancellable_wait(&t.ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000)), + &t.cancel); interval = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), interval); GPR_ASSERT(gpr_time_cmp(gpr_time_from_micros(100000), interval) >= 0); diff --git a/test/core/support/sync_test.c b/test/core/support/sync_test.c index bf5829c415..99be5cdc90 100644 --- a/test/core/support/sync_test.c +++ b/test/core/support/sync_test.c @@ -323,7 +323,8 @@ static void inc_with_1ms_delay(void *v /*=m*/) { for (i = 0; i != m->iterations; i++) { gpr_timespec deadline; gpr_mu_lock(&m->mu); - deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000)); + deadline = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000)); while (!gpr_cv_wait(&m->cv, &m->mu, deadline)) { } m->counter++; @@ -339,7 +340,8 @@ static void inc_with_1ms_delay_event(void *v /*=m*/) { gpr_int64 i; for (i = 0; i != m->iterations; i++) { gpr_timespec deadline; - deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000)); + deadline = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000)); GPR_ASSERT(gpr_event_wait(&m->event, deadline) == NULL); gpr_mu_lock(&m->mu); m->counter++; @@ -382,9 +384,9 @@ static void consumer(void *v /*=m*/) { gpr_mu_lock(&m->mu); m->counter = n; gpr_mu_unlock(&m->mu); - GPR_ASSERT( - !queue_remove(&m->q, &value, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(1000000)))); + GPR_ASSERT(!queue_remove(&m->q, &value, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000)))); mark_thread_done(m); } diff --git a/test/core/util/test_config.h b/test/core/util/test_config.h index 3db13f5739..3218953166 100644 --- a/test/core/util/test_config.h +++ b/test/core/util/test_config.h @@ -52,11 +52,11 @@ extern "C" { (GRPC_TEST_SLOWDOWN_BUILD_FACTOR * GRPC_TEST_SLOWDOWN_MACHINE_FACTOR) #define GRPC_TIMEOUT_SECONDS_TO_DEADLINE(x) \ - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), \ + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), \ gpr_time_from_micros(GRPC_TEST_SLOWDOWN_FACTOR * 1e6 * (x))) -#define GRPC_TIMEOUT_MILLIS_TO_DEADLINE(x) \ - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), \ +#define GRPC_TIMEOUT_MILLIS_TO_DEADLINE(x) \ + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), \ gpr_time_from_micros(GRPC_TEST_SLOWDOWN_FACTOR * 1e3 * (x))) #ifndef GRPC_TEST_CUSTOM_PICK_PORT @@ -69,4 +69,4 @@ void grpc_test_init(int argc, char **argv); } #endif /* __cplusplus */ -#endif /* GRPC_TEST_CORE_UTIL_TEST_CONFIG_H */ +#endif /* GRPC_TEST_CORE_UTIL_TEST_CONFIG_H */ -- cgit v1.2.3 From ab54f793ffbf8f1e805f85d6b0a95c93e619baec Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Jul 2015 08:34:20 -0700 Subject: Reduce spam --- src/core/iomgr/iomgr.c | 5 +++++ src/core/surface/server.c | 36 ++++++++++++++++++++++++++++-------- test/core/util/grpc_profiler.c | 16 +++++++++++----- 3 files changed, 44 insertions(+), 13 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index c507e7c26a..4a2c45a023 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -111,10 +111,13 @@ void grpc_iomgr_shutdown(void) { grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); + gpr_timespec last_warning_time = gpr_now(); gpr_mu_lock(&g_mu); g_shutdown = 1; while (g_cbs_head != NULL || g_root_object.next != &g_root_object) { + if (gpr_time_cmp(gpr_time_sub(gpr_now(), last_warning_time), + gpr_time_from_seconds(1)) >= 0) { if (g_cbs_head != NULL && g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed and executing " @@ -126,6 +129,8 @@ void grpc_iomgr_shutdown(void) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed", count_objects()); } + last_warning_time = gpr_now(); + } if (g_cbs_head) { do { closure = g_cbs_head; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 341ca2942c..4053b22063 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -166,6 +166,9 @@ struct grpc_server { listener *listeners; int listeners_destroyed; gpr_refcount internal_refcount; + + /** when did we print the last shutdown progress message */ + gpr_timespec last_shutdown_message_time; }; typedef enum { @@ -476,20 +479,35 @@ static int num_listeners(grpc_server *server) { return n; } +static int num_channels(grpc_server *server) { + channel_data *chand; + int n = 0; + for (chand = server->root_channel_data.next; + chand != &server->root_channel_data; chand = chand->next) { + n++; + } + return n; +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!server->shutdown || server->shutdown_published) { return; } - if (server->root_channel_data.next != &server->root_channel_data) { - gpr_log(GPR_DEBUG, - "Waiting for all channels to close before destroying server"); - return; - } - if (server->listeners_destroyed < num_listeners(server)) { - gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)", - server->listeners_destroyed, num_listeners(server)); + if (server->root_channel_data.next != &server->root_channel_data || + server->listeners_destroyed < num_listeners(server)) { + if (gpr_time_cmp( + gpr_time_sub(gpr_now(), server->last_shutdown_message_time), + gpr_time_from_seconds(1)) >= 0) { + server->last_shutdown_message_time = gpr_now(); + gpr_log(GPR_DEBUG, + "Waiting for %d channels and %d/%d listeners to be destroyed" + " before shutting down server", + num_channels(server), + num_listeners(server) - server->listeners_destroyed, + num_listeners(server)); + } return; } server->shutdown_published = 1; @@ -930,6 +948,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, return; } + server->last_shutdown_message_time = gpr_now(); + channel_broadcaster_init(server, &broadcaster); /* collect all unregistered then registered calls */ diff --git a/test/core/util/grpc_profiler.c b/test/core/util/grpc_profiler.c index d5b6cfeef1..c2c0c9cf53 100644 --- a/test/core/util/grpc_profiler.c +++ b/test/core/util/grpc_profiler.c @@ -43,11 +43,17 @@ void grpc_profiler_stop() { ProfilerStop(); } #include void grpc_profiler_start(const char *filename) { - gpr_log(GPR_DEBUG, - "You do not have google-perftools installed, profiling is disabled [for %s]", filename); - gpr_log(GPR_DEBUG, - "To install on ubuntu: sudo apt-get install google-perftools " - "libgoogle-perftools-dev"); + static int printed_warning = 0; + if (!printed_warning) { + gpr_log(GPR_DEBUG, + "You do not have google-perftools installed, profiling is disabled " + "[for %s]", + filename); + gpr_log(GPR_DEBUG, + "To install on ubuntu: sudo apt-get install google-perftools " + "libgoogle-perftools-dev"); + printed_warning = 1; + } } void grpc_profiler_stop(void) {} -- cgit v1.2.3 From 97fc6a3f3f6e3bbb39b6fda1444bc533deb8803d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Jul 2015 15:31:35 -0700 Subject: Rewrite completion queue internals to use pre-allocation of events --- src/core/iomgr/pollset_multipoller_with_epoll.c | 3 +- src/core/surface/call.c | 42 ++++- src/core/surface/completion_queue.c | 216 +++++++++--------------- src/core/surface/completion_queue.h | 22 ++- src/core/surface/server.c | 165 +++++++++--------- test/core/surface/completion_queue_test.c | 24 ++- 6 files changed, 227 insertions(+), 245 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 1900bbf9e1..3746c8edaf 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -105,10 +105,11 @@ static void multipoll_with_epoll_pollset_maybe_work( * here. */ - timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); pollset->counter += 1; gpr_mu_unlock(&pollset->mu); + timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); + do { ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms); if (ep_rv < 0) { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a28a542c8d..445111ca40 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -162,6 +162,8 @@ struct grpc_call { gpr_uint8 error_status_set; /** should the alarm be cancelled */ gpr_uint8 cancel_alarm; + /** bitmask of allocated completion events in completions */ + gpr_uint8 allocated_completions; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -250,6 +252,9 @@ struct grpc_call { grpc_iomgr_closure on_done_recv; grpc_iomgr_closure on_done_send; grpc_iomgr_closure on_done_bind; + + /** completion events - for completion queue use */ + grpc_cq_completion completions[6]; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -349,6 +354,27 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } +grpc_cq_completion *allocate_completion(grpc_call *call) { + gpr_uint8 i; + for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { + if (call->allocated_completions & (1u << i)) { + continue; + } + call->allocated_completions |= 1u << i; + return &call->completions[i]; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +void done_completion(void *call, grpc_cq_completion *completion) { + grpc_call *c = call; + gpr_mu_lock(&c->mu); + c->allocated_completions &= ~(1u << (completion - c->completions)); + gpr_mu_unlock(&c->mu); + GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); +} + #ifdef GRPC_CALL_REF_COUNT_DEBUG void grpc_call_internal_ref(grpc_call *c, const char *reason) { gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, @@ -1316,11 +1342,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, success); + grpc_cq_end_op(call->cq, + tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_end_op(call->cq, + tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1343,8 +1373,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { - grpc_cq_begin_op(call->cq, call); - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_begin_op(call->cq); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call)); return GRPC_CALL_OK; } @@ -1466,7 +1497,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, } } - grpc_cq_begin_op(call->cq, call); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_begin_op(call->cq); return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 030a8b4e6f..86481af02c 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -45,34 +45,20 @@ #include #include -#define NUM_TAG_BUCKETS 31 - -/* A single event: extends grpc_event to form a linked list with a destruction - function (on_finish) that is hidden from outside this module */ -typedef struct event { - grpc_event base; - struct event *queue_next; - struct event *queue_prev; - struct event *bucket_next; - struct event *bucket_prev; -} event; - /* Completion queue structure */ struct grpc_completion_queue { - /* When refs drops to zero, we are in shutdown mode, and will be destroyable - once all queued events are drained */ - gpr_refcount refs; - /* Once owning_refs drops to zero, we will destroy the cq */ + /** completed events */ + grpc_cq_completion completed_head; + grpc_cq_completion *completed_tail; + /** Number of pending events (+1 if we're not shutdown) */ + gpr_refcount pending_events; + /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /* the set of low level i/o things that concern this cq */ + /** the set of low level i/o things that concern this cq */ grpc_pollset pollset; - /* 0 initially, 1 once we've begun shutting down */ + /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; - /* Head of a linked list of queued events (prev points to the last element) */ - event *queue; - /* Fixed size chained hash table of events for pluck() */ - event *buckets[NUM_TAG_BUCKETS]; int is_server_cq; }; @@ -80,10 +66,12 @@ grpc_completion_queue *grpc_completion_queue_create(void) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cc->refs, 1); + gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ gpr_ref_init(&cc->owning_refs, 2); grpc_pollset_init(&cc->pollset); + cc->completed_tail = &cc->completed_head; + cc->completed_head.next = (gpr_uintptr) cc->completed_tail; return cc; } @@ -114,179 +102,127 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->queue == NULL); + GPR_ASSERT(cc->completed_head.next == (gpr_uintptr) &cc->completed_head); grpc_pollset_destroy(&cc->pollset); gpr_free(cc); } } -/* Create and append an event to the queue. Returns the event so that its data - members can be filled in. - Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ -static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, - void *tag, grpc_call *call) { - event *ev = gpr_malloc(sizeof(event)); - gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - ev->base.type = type; - ev->base.tag = tag; - if (cc->queue == NULL) { - cc->queue = ev->queue_next = ev->queue_prev = ev; - } else { - ev->queue_next = cc->queue; - ev->queue_prev = cc->queue->queue_prev; - ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev; - } - if (cc->buckets[bucket] == NULL) { - cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev; - } else { - ev->bucket_next = cc->buckets[bucket]; - ev->bucket_prev = cc->buckets[bucket]->bucket_prev; - ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; - } - grpc_pollset_kick(&cc->pollset); - return ev; -} - -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) { - gpr_ref(&cc->refs); - if (call) GRPC_CALL_INTERNAL_REF(call, "cq"); +void grpc_cq_begin_op(grpc_completion_queue *cc) { + gpr_ref(&cc->pending_events); } /* Signal the end of an operation - if this is the last waiting-to-be-queued event, then enter shutdown mode */ -void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - int success) { - event *ev; - int shutdown = 0; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); - ev->base.success = success; - if (gpr_unref(&cc->refs)) { +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op( + grpc_completion_queue *cc, + void *tag, + int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, + grpc_cq_completion *storage) { + int shutdown = gpr_unref(&cc->pending_events); + + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); + + if (!shutdown) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail = storage; + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + } else { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail = storage; GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - shutdown = 1; - } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0); - if (shutdown) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); } } -/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ -static event *create_shutdown_event(void) { - event *ev = gpr_malloc(sizeof(event)); - ev->base.type = GRPC_QUEUE_SHUTDOWN; - ev->base.tag = NULL; - return ev; -} - grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { - event *ev = NULL; grpc_event ret; GRPC_CQ_INTERNAL_REF(cc, "next"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { - if (cc->queue != NULL) { - gpr_uintptr bucket; - ev = cc->queue; - bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS; - cc->queue = ev->queue_next; - ev->queue_next->queue_prev = ev->queue_prev; - ev->queue_prev->queue_next = ev->queue_next; - ev->bucket_next->bucket_prev = ev->bucket_prev; - ev->bucket_prev->bucket_next = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = NULL; - } - } - if (cc->queue == ev) { - cc->queue = NULL; + if (cc->completed_tail != &cc->completed_head) { + grpc_cq_completion *c = (grpc_cq_completion *) cc->completed_head.next; + cc->completed_head.next = c->next & ~(gpr_uintptr)1; + if (c == cc->completed_tail) { + cc->completed_tail = &cc->completed_head; } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(c->done_arg, c); break; } if (cc->shutdown) { - ev = create_shutdown_event(); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_SHUTDOWN; break; } if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "next"); - return ret; + break; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - ret = ev->base; - gpr_free(ev); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); return ret; } -static event *pluck_event(grpc_completion_queue *cc, void *tag) { - gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - event *ev = cc->buckets[bucket]; - if (ev == NULL) return NULL; - do { - if (ev->base.tag == tag) { - ev->queue_next->queue_prev = ev->queue_prev; - ev->queue_prev->queue_next = ev->queue_next; - ev->bucket_next->bucket_prev = ev->bucket_prev; - ev->bucket_prev->bucket_next = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = NULL; - } - } - if (cc->queue == ev) { - cc->queue = ev->queue_next; - if (cc->queue == ev) { - cc->queue = NULL; - } - } - return ev; - } - ev = ev->bucket_next; - } while (ev != cc->buckets[bucket]); - return NULL; -} - grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { - event *ev = NULL; grpc_event ret; + grpc_cq_completion *c; + grpc_cq_completion *prev; GRPC_CQ_INTERNAL_REF(cc, "pluck"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { - if ((ev = pluck_event(cc, tag))) { - break; + prev = &cc->completed_head; + while ((c = (grpc_cq_completion*)(prev->next & ~(gpr_uintptr)1)) != &cc->completed_head) { + if (c->tag == tag) { + prev->next = (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); + if (c == cc->completed_tail) { + cc->completed_tail = prev; + } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(c->done_arg, c); + goto done; + } + prev = c; } if (cc->shutdown) { - ev = create_shutdown_event(); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_SHUTDOWN; break; } if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); - return ret; + break; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - ret = ev->base; - gpr_free(ev); +done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); return ret; @@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { cc->shutdown_called = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (gpr_unref(&cc->refs)) { + if (gpr_unref(&cc->pending_events)) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 1b9010f462..f926d411f3 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -39,6 +39,17 @@ #include "src/core/iomgr/pollset.h" #include +typedef struct grpc_cq_completion { + /** user supplied tag */ + void *tag; + /** done callback - called when this queue element is no longer + needed by the completion queue */ + void (*done)(void *done_arg, struct grpc_cq_completion *c); + void *done_arg; + /** next pointer; low bit is used to indicate success or not */ + gpr_uintptr next; +} grpc_cq_completion; + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); @@ -57,11 +68,16 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call); +void grpc_cq_begin_op(grpc_completion_queue *cc); /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - int success); +void grpc_cq_end_op( + grpc_completion_queue *cc, + void *tag, + int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, + grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 341ca2942c..2c9115e9ee 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -72,12 +72,14 @@ typedef struct { typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; -typedef struct { +typedef struct requested_call { requested_call_type type; + struct requested_call *next; void *tag; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; + grpc_cq_completion completion; union { struct { grpc_call_details *details; @@ -92,17 +94,11 @@ typedef struct { } data; } requested_call; -typedef struct { - requested_call *calls; - size_t count; - size_t capacity; -} requested_call_array; - struct registered_method { char *method; char *host; call_data *pending; - requested_call_array requested; + requested_call *requests; registered_method *next; }; @@ -131,6 +127,7 @@ struct channel_data { typedef struct shutdown_tag { void *tag; grpc_completion_queue *cq; + grpc_cq_completion completion; } shutdown_tag; struct grpc_server { @@ -153,7 +150,7 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; - requested_call_array requested_calls; + requested_call *requests; gpr_uint8 shutdown; gpr_uint8 shutdown_published; @@ -325,22 +322,6 @@ static int call_list_remove(call_data *call, call_list list) { return 1; } -static void requested_call_array_destroy(requested_call_array *array) { - gpr_free(array->calls); -} - -static requested_call *requested_call_array_add(requested_call_array *array) { - requested_call *rc; - if (array->count == array->capacity) { - array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2); - array->calls = - gpr_realloc(array->calls, sizeof(requested_call) * array->capacity); - } - rc = &array->calls[array->count++]; - memset(rc, 0, sizeof(*rc)); - return rc; -} - static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -352,12 +333,10 @@ static void server_delete(grpc_server *server) { gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); gpr_free(server->channel_filters); - requested_call_array_destroy(&server->requested_calls); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; gpr_free(rm->method); gpr_free(rm->host); - requested_call_array_destroy(&rm->requested); gpr_free(rm); } for (i = 0; i < server->cq_count; i++) { @@ -406,18 +385,18 @@ static void destroy_channel(channel_data *chand) { static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, - requested_call_array *array) { - requested_call rc; + requested_call **requests) { + requested_call *rc = *requests; call_data *calld = elem->call_data; - if (array->count == 0) { + if (rc == NULL) { calld->state = PENDING; call_list_join(pending_root, calld, PENDING_START); gpr_mu_unlock(&server->mu_call); } else { - rc = array->calls[--array->count]; + *requests = rc->next; calld->state = ACTIVATED; gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, &rc); + begin_call(server, calld, rc); } } @@ -442,7 +421,7 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->method != calld->path) continue; finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + &rm->server_registered_method->requests); return; } /* check for a wildcard method definition (no host set) */ @@ -455,12 +434,12 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->method != calld->path) continue; finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + &rm->server_registered_method->requests); return; } } finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], - &server->requested_calls); + &server->requests); } static void kill_zombie(void *elem, int success) { @@ -476,6 +455,10 @@ static int num_listeners(grpc_server *server) { return n; } +static void done_shutdown_event(void *server, grpc_cq_completion *completion) { + server_unref(server); +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!server->shutdown || server->shutdown_published) { @@ -494,8 +477,14 @@ static void maybe_finish_shutdown(grpc_server *server) { } server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { - grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, - NULL, 1); + server_ref(server); + grpc_cq_end_op(server->shutdown_tags[i].cq, + server->shutdown_tags[i].tag, + 1, + done_shutdown_event, + server, + &server->shutdown_tags[i].completion + ); } } @@ -910,15 +899,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call_array requested_calls; - size_t i; + requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); - grpc_cq_begin_op(cq, NULL); + grpc_cq_begin_op(cq); server->shutdown_tags = gpr_realloc(server->shutdown_tags, sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); @@ -934,23 +922,15 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requested_calls = server->requested_calls; - memset(&server->requested_calls, 0, sizeof(server->requested_calls)); + requests = server->requests; + server->requests = NULL; for (rm = server->registered_methods; rm; rm = rm->next) { - if (requested_calls.count + rm->requested.count > - requested_calls.capacity) { - requested_calls.capacity = - GPR_MAX(requested_calls.count + rm->requested.count, - 2 * requested_calls.capacity); - requested_calls.calls = - gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * - requested_calls.capacity); + while (rm->requests != NULL) { + requested_call *c = rm->requests; + rm->requests = c->next; + c->next = requests; + requests = c; } - memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, - sizeof(*requested_calls.calls) * rm->requested.count); - requested_calls.count += rm->requested.count; - gpr_free(rm->requested.calls); - memset(&rm->requested, 0, sizeof(rm->requested)); } gpr_mu_unlock(&server->mu_call); @@ -959,10 +939,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - for (i = 0; i < requested_calls.count; i++) { - fail_call(server, &requested_calls.calls[i]); + while (requests != NULL) { + requested_call *next = requests->next; + fail_call(server, requests); + requests = next; } - gpr_free(requested_calls.calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -1024,7 +1005,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call_array *requested_calls = NULL; + requested_call **requests; gpr_mu_lock(&server->mu_call); if (server->shutdown) { gpr_mu_unlock(&server->mu_call); @@ -1035,12 +1016,12 @@ static grpc_call_error queue_call_request(grpc_server *server, case BATCH_CALL: calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); - requested_calls = &server->requested_calls; + requests = &server->requests; break; case REGISTERED_CALL: calld = call_list_remove_head( &rc->data.registered.registered_method->pending, PENDING_START); - requested_calls = &rc->data.registered.registered_method->requested; + requests = &rc->data.registered.registered_method->requests; break; } if (calld) { @@ -1050,7 +1031,8 @@ static grpc_call_error queue_call_request(grpc_server *server, begin_call(server, calld, rc); return GRPC_CALL_OK; } else { - *requested_call_array_add(requested_calls) = *rc; + rc->next = *requests; + *requests = rc; gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } @@ -1061,22 +1043,23 @@ grpc_call_error grpc_server_request_call( grpc_metadata_array *initial_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = BATCH_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.batch.details = details; - rc.data.batch.initial_metadata = initial_metadata; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = BATCH_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.batch.details = details; + rc->data.batch.initial_metadata = initial_metadata; + return queue_call_request(server, rc); } grpc_call_error grpc_server_request_registered_call( @@ -1084,22 +1067,23 @@ grpc_call_error grpc_server_request_registered_call( grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *registered_method = rm; if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = REGISTERED_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.registered.registered_method = registered_method; - rc.data.registered.deadline = deadline; - rc.data.registered.initial_metadata = initial_metadata; - rc.data.registered.optional_payload = optional_payload; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = REGISTERED_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.registered.registered_method = registered_method; + rc->data.registered.deadline = deadline; + rc->data.registered.initial_metadata = initial_metadata; + rc->data.registered.optional_payload = optional_payload; + return queue_call_request(server, rc); } static void publish_registered_or_batch(grpc_call *call, int success, @@ -1167,7 +1151,11 @@ static void begin_call(grpc_server *server, call_data *calld, GRPC_CALL_INTERNAL_REF(calld->call, "server"); grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, - rc->tag); + rc); +} + +static void done_request_event(void *req, grpc_cq_completion *c) { + gpr_free(req); } static void fail_call(grpc_server *server, requested_call *rc) { @@ -1180,15 +1168,16 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } - grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0); + grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); } static void publish_registered_or_batch(grpc_call *call, int success, - void *tag) { + void *prc) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + requested_call *rc = prc; call_data *calld = elem->call_data; - grpc_cq_end_op(calld->cq_new, tag, call, success); + grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index eba24f5c6e..5b1d9cada9 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -74,17 +74,20 @@ static void test_wait_empty(void) { shutdown_and_destroy(cc); } +static void do_nothing_end_completion(void *arg, grpc_cq_completion *c) {} + static void test_cq_end_op(void) { grpc_event ev; grpc_completion_queue *cc; + grpc_cq_completion completion; void *tag = create_test_tag(); LOG_TEST("test_cq_end_op"); cc = grpc_completion_queue_create(); - grpc_cq_begin_op(cc, NULL); - grpc_cq_end_op(cc, tag, NULL, 1); + grpc_cq_begin_op(cc); + grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion); ev = grpc_completion_queue_next(cc, gpr_inf_past); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); @@ -120,6 +123,7 @@ static void test_pluck(void) { grpc_event ev; grpc_completion_queue *cc; void *tags[128]; + grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; unsigned i, j; LOG_TEST("test_pluck"); @@ -134,8 +138,8 @@ static void test_pluck(void) { cc = grpc_completion_queue_create(); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, NULL); - grpc_cq_end_op(cc, tags[i], NULL, 1); + grpc_cq_begin_op(cc); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -144,8 +148,8 @@ static void test_pluck(void) { } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, NULL); - grpc_cq_end_op(cc, tags[i], NULL, 1); + grpc_cq_begin_op(cc); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -174,6 +178,10 @@ gpr_timespec ten_seconds_time(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); } +static void free_completion(void *arg, grpc_cq_completion *completion) { + gpr_free(completion); +} + static void producer_thread(void *arg) { test_thread_options *opt = arg; int i; @@ -184,7 +192,7 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 1", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_begin_op(opt->cc, NULL); + grpc_cq_begin_op(opt->cc); } gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id); @@ -193,7 +201,7 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 2", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, NULL, 1); + grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); opt->events_triggered++; } -- cgit v1.2.3 From b4c55ea07294889a1f9ad5b3a334ae787b181e4c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Jul 2015 15:39:27 -0700 Subject: Fix uninitialized variable --- src/core/surface/server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/surface') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 2c9115e9ee..9a29060f2b 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -1005,7 +1005,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call **requests; + requested_call **requests = NULL; gpr_mu_lock(&server->mu_call); if (server->shutdown) { gpr_mu_unlock(&server->mu_call); -- cgit v1.2.3 From c012258eb8bb0ef76ada117cbebcd458fc884f32 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Jul 2015 13:21:36 -0700 Subject: FIx memory leak --- src/core/surface/server.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/core/surface') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index d2d9a7b038..ff02428c49 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -1195,6 +1195,7 @@ static void publish_registered_or_batch(grpc_call *call, int success, requested_call *rc = prc; call_data *calld = elem->call_data; grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); + GRPC_CALL_INTERNAL_UNREF(call, "server", 0); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { -- cgit v1.2.3 From 12cf537a72468edd3860127c0eb37abd78120e34 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Jul 2015 13:48:11 -0700 Subject: clang-format affected files --- src/core/surface/call.c | 13 +++++----- src/core/surface/completion_queue.c | 41 +++++++++++++++---------------- src/core/surface/completion_queue.h | 12 +++------ src/core/surface/server.c | 22 ++++++++--------- test/core/surface/completion_queue_test.c | 9 ++++--- 5 files changed, 46 insertions(+), 51 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index db382a5fe7..79a399c227 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1342,15 +1342,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, - tag, success, done_completion, call, - allocate_completion(call)); + grpc_cq_end_op(call->cq, tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, - tag, 1, done_completion, call, - allocate_completion(call)); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1375,7 +1373,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, if (nops == 0) { grpc_cq_begin_op(call->cq); GRPC_CALL_INTERNAL_REF(call, "completion"); - grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call)); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); return GRPC_CALL_OK; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 86481af02c..67f4443e9d 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -71,16 +71,15 @@ grpc_completion_queue *grpc_completion_queue_create(void) { gpr_ref_init(&cc->owning_refs, 2); grpc_pollset_init(&cc->pollset); cc->completed_tail = &cc->completed_head; - cc->completed_head.next = (gpr_uintptr) cc->completed_tail; + cc->completed_head.next = (gpr_uintptr)cc->completed_tail; return cc; } #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -95,14 +94,13 @@ static void on_pollset_destroy_done(void *arg) { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->completed_head.next == (gpr_uintptr) &cc->completed_head); + GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head); grpc_pollset_destroy(&cc->pollset); gpr_free(cc); } @@ -115,28 +113,27 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { /* Signal the end of an operation - if this is the last waiting-to-be-queued event, then enter shutdown mode */ /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op( - grpc_completion_queue *cc, - void *tag, - int success, - void (*done)(void *done_arg, grpc_cq_completion *storage), - void *done_arg, - grpc_cq_completion *storage) { +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { int shutdown = gpr_unref(&cc->pending_events); storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); + storage->next = + ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); if (!shutdown) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail->next = + ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } else { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail->next = + ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); @@ -154,7 +151,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if (cc->completed_tail != &cc->completed_head) { - grpc_cq_completion *c = (grpc_cq_completion *) cc->completed_head.next; + grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; cc->completed_head.next = c->next & ~(gpr_uintptr)1; if (c == cc->completed_tail) { cc->completed_tail = &cc->completed_head; @@ -194,9 +191,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { prev = &cc->completed_head; - while ((c = (grpc_cq_completion*)(prev->next & ~(gpr_uintptr)1)) != &cc->completed_head) { + while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) != + &cc->completed_head) { if (c->tag == tag) { - prev->next = (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); + prev->next = + (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); if (c == cc->completed_tail) { cc->completed_tail = prev; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index f926d411f3..f944f48d8e 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -40,7 +40,7 @@ #include typedef struct grpc_cq_completion { - /** user supplied tag */ + /** user supplied tag */ void *tag; /** done callback - called when this queue element is no longer needed by the completion queue */ @@ -71,13 +71,9 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); void grpc_cq_begin_op(grpc_completion_queue *cc); /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op( - grpc_completion_queue *cc, - void *tag, - int success, - void (*done)(void *done_arg, grpc_cq_completion *storage), - void *done_arg, - grpc_cq_completion *storage); +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index ff02428c49..32e5058b4d 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -267,7 +267,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, int force_disconnect) { + int send_goaway, + int force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { @@ -484,13 +485,9 @@ static void maybe_finish_shutdown(grpc_server *server) { server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { server_ref(server); - grpc_cq_end_op(server->shutdown_tags[i].cq, - server->shutdown_tags[i].tag, - 1, - done_shutdown_event, - server, - &server->shutdown_tags[i].completion - ); + grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, + done_shutdown_event, server, + &server->shutdown_tags[i].completion); } } @@ -1167,8 +1164,7 @@ static void begin_call(grpc_server *server, call_data *calld, } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, - rc); + grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc); } static void done_request_event(void *req, grpc_cq_completion *c) { @@ -1185,7 +1181,8 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } - grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); + grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, + &rc->completion); } static void publish_registered_or_batch(grpc_call *call, int success, @@ -1194,7 +1191,8 @@ static void publish_registered_or_batch(grpc_call *call, int success, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); requested_call *rc = prc; call_data *calld = elem->call_data; - grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); + grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, + &rc->completion); GRPC_CALL_INTERNAL_UNREF(call, "server", 0); } diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index 5b1d9cada9..67e66a0a9f 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -139,7 +139,8 @@ static void test_pluck(void) { for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { grpc_cq_begin_op(cc); - grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, + &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -149,7 +150,8 @@ static void test_pluck(void) { for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { grpc_cq_begin_op(cc); - grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, + &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -201,7 +203,8 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 2", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); opt->events_triggered++; } -- cgit v1.2.3 From 396fab25ede9436f73f3fc2b81b6bf6abccbc871 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Jul 2015 14:14:09 -0700 Subject: Fix TSAN reported errors --- src/core/surface/call.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 79a399c227..2cb2cd6e98 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -135,6 +135,7 @@ struct grpc_call { grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; + gpr_mu completion_mu; /* how far through the stream have we read? */ read_state read_state; @@ -291,6 +292,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); + gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; if (cq) { @@ -356,11 +358,13 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { grpc_cq_completion *allocate_completion(grpc_call *call) { gpr_uint8 i; + gpr_mu_lock(&call->completion_mu); for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { if (call->allocated_completions & (1u << i)) { continue; } call->allocated_completions |= 1u << i; + gpr_mu_unlock(&call->completion_mu); return &call->completions[i]; } gpr_log(GPR_ERROR, "should never reach here"); @@ -369,9 +373,9 @@ grpc_cq_completion *allocate_completion(grpc_call *call) { void done_completion(void *call, grpc_cq_completion *completion) { grpc_call *c = call; - gpr_mu_lock(&c->mu); + gpr_mu_lock(&c->completion_mu); c->allocated_completions &= ~(1u << (completion - c->completions)); - gpr_mu_unlock(&c->mu); + gpr_mu_unlock(&c->completion_mu); GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); } @@ -391,6 +395,7 @@ static void destroy_call(void *call, int ignored_success) { grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); + gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { GRPC_MDSTR_UNREF(c->status[i].details); -- cgit v1.2.3 From c6d6d9056d809c560688f62bf19a02463b7487fd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Jul 2015 15:53:50 -0700 Subject: Fix host name matching not working --- src/core/surface/server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/surface') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index a9d8940631..5af5f47781 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -441,7 +441,7 @@ static void start_new_rpc(grpc_call_element *elem) { /* TODO(ctiller): unify these two searches */ /* check for an exact match with host */ hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); - for (i = 0; i < chand->registered_method_max_probes; i++) { + for (i = 0; i <= chand->registered_method_max_probes; i++) { rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; if (!rm) break; -- cgit v1.2.3 From 86062bb01cc433f6b6d383091300eb66f23b893a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Jul 2015 16:56:21 -0700 Subject: Add missing kick --- src/core/surface/completion_queue.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/core/surface') diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 67f4443e9d..2ff47eb107 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -129,6 +129,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; + grpc_pollset_kick(&cc->pollset); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } else { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); -- cgit v1.2.3 From 2e622bc16a946f62a8feb494eba0c74fd2bb6f0d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Jul 2015 07:46:03 -0700 Subject: Add a version string and a function to retrieve it --- BUILD | 3 ++ Makefile | 2 ++ build.json | 1 + gRPC.podspec | 1 + include/grpc/grpc.h | 3 ++ src/core/surface/version.c | 41 ++++++++++++++++++++++ tools/doxygen/Doxyfile.core.internal | 1 + tools/run_tests/sources_and_headers.json | 2 ++ vsprojects/grpc/grpc.vcxproj | 2 ++ vsprojects/grpc/grpc.vcxproj.filters | 3 ++ vsprojects/grpc_unsecure/grpc_unsecure.vcxproj | 2 ++ .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 3 ++ 12 files changed, 64 insertions(+) create mode 100644 src/core/surface/version.c (limited to 'src/core/surface') diff --git a/BUILD b/BUILD index 13a8784ed1..d75bd4205f 100644 --- a/BUILD +++ b/BUILD @@ -348,6 +348,7 @@ cc_library( "src/core/surface/server_chttp2.c", "src/core/surface/server_create.c", "src/core/surface/surface_trace.c", + "src/core/surface/version.c", "src/core/transport/chttp2/alpn.c", "src/core/transport/chttp2/bin_encoder.c", "src/core/transport/chttp2/frame_data.c", @@ -578,6 +579,7 @@ cc_library( "src/core/surface/server_chttp2.c", "src/core/surface/server_create.c", "src/core/surface/surface_trace.c", + "src/core/surface/version.c", "src/core/transport/chttp2/alpn.c", "src/core/transport/chttp2/bin_encoder.c", "src/core/transport/chttp2/frame_data.c", @@ -1049,6 +1051,7 @@ objc_library( "src/core/surface/server_chttp2.c", "src/core/surface/server_create.c", "src/core/surface/surface_trace.c", + "src/core/surface/version.c", "src/core/transport/chttp2/alpn.c", "src/core/transport/chttp2/bin_encoder.c", "src/core/transport/chttp2/frame_data.c", diff --git a/Makefile b/Makefile index 08171320d4..d523333b90 100644 --- a/Makefile +++ b/Makefile @@ -3313,6 +3313,7 @@ LIBGRPC_SRC = \ src/core/surface/server_chttp2.c \ src/core/surface/server_create.c \ src/core/surface/surface_trace.c \ + src/core/surface/version.c \ src/core/transport/chttp2/alpn.c \ src/core/transport/chttp2/bin_encoder.c \ src/core/transport/chttp2/frame_data.c \ @@ -3573,6 +3574,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/surface/server_chttp2.c \ src/core/surface/server_create.c \ src/core/surface/surface_trace.c \ + src/core/surface/version.c \ src/core/transport/chttp2/alpn.c \ src/core/transport/chttp2/bin_encoder.c \ src/core/transport/chttp2/frame_data.c \ diff --git a/build.json b/build.json index 7d2242b116..ca063e510e 100644 --- a/build.json +++ b/build.json @@ -287,6 +287,7 @@ "src/core/surface/server_chttp2.c", "src/core/surface/server_create.c", "src/core/surface/surface_trace.c", + "src/core/surface/version.c", "src/core/transport/chttp2/alpn.c", "src/core/transport/chttp2/bin_encoder.c", "src/core/transport/chttp2/frame_data.c", diff --git a/gRPC.podspec b/gRPC.podspec index 921a4ab096..f678819b96 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -357,6 +357,7 @@ Pod::Spec.new do |s| 'src/core/surface/server_chttp2.c', 'src/core/surface/server_create.c', 'src/core/surface/surface_trace.c', + 'src/core/surface/version.c', 'src/core/transport/chttp2/alpn.c', 'src/core/transport/chttp2/bin_encoder.c', 'src/core/transport/chttp2/frame_data.c', diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 782923d599..3c72c1db27 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -351,6 +351,9 @@ void grpc_init(void); destroyed. */ void grpc_shutdown(void); +/** Return a string representing the current version of grpc */ +const char *grpc_version_string(void); + /** Create a completion queue */ grpc_completion_queue *grpc_completion_queue_create(void); diff --git a/src/core/surface/version.c b/src/core/surface/version.c new file mode 100644 index 0000000000..4f5d648371 --- /dev/null +++ b/src/core/surface/version.c @@ -0,0 +1,41 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* This file is autogenerated from: + templates/src/core/surface/version.c.template */ + +#include + +const char *grpc_version_string(void) { + return "0.10.0.0"; +} diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index ac411e3c02..f11df21388 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -985,6 +985,7 @@ src/core/surface/server.c \ src/core/surface/server_chttp2.c \ src/core/surface/server_create.c \ src/core/surface/surface_trace.c \ +src/core/surface/version.c \ src/core/transport/chttp2/alpn.c \ src/core/transport/chttp2/bin_encoder.c \ src/core/transport/chttp2/frame_data.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 1fc5c20fe4..2a5fe8a73e 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -9025,6 +9025,7 @@ "src/core/surface/server_create.c", "src/core/surface/surface_trace.c", "src/core/surface/surface_trace.h", + "src/core/surface/version.c", "src/core/transport/chttp2/alpn.c", "src/core/transport/chttp2/alpn.h", "src/core/transport/chttp2/bin_encoder.c", @@ -9426,6 +9427,7 @@ "src/core/surface/server_create.c", "src/core/surface/surface_trace.c", "src/core/surface/surface_trace.h", + "src/core/surface/version.c", "src/core/transport/chttp2/alpn.c", "src/core/transport/chttp2/alpn.h", "src/core/transport/chttp2/bin_encoder.c", diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj index 865e76ef10..16744b181b 100644 --- a/vsprojects/grpc/grpc.vcxproj +++ b/vsprojects/grpc/grpc.vcxproj @@ -481,6 +481,8 @@ + + diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters index f560fef9ed..de9f20521c 100644 --- a/vsprojects/grpc/grpc.vcxproj.filters +++ b/vsprojects/grpc/grpc.vcxproj.filters @@ -316,6 +316,9 @@ src\core\surface + + src\core\surface + src\core\transport\chttp2 diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj index 4ead6f8d98..02c791f995 100644 --- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj @@ -416,6 +416,8 @@ + + diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters index 45ca1f7c33..333a71f564 100644 --- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -247,6 +247,9 @@ src\core\surface + + src\core\surface + src\core\transport\chttp2 -- cgit v1.2.3 From 1b01167425a17f57f69d2abff24a73b85fc4b425 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Jul 2015 10:41:44 -0700 Subject: Remove magic number --- src/core/surface/call.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'src/core/surface') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 2cb2cd6e98..1e85b0f082 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -49,6 +49,17 @@ #include #include +/** The maximum number of completions possible. + Based upon the maximum number of individually queueable ops in the batch + api: + - initial metadata send + - message send + - status/close send (depending on client/server) + - initial metadata recv + - message recv + - status/close recv (depending on client/server) */ +#define MAX_CONCURRENT_COMPLETIONS 6 + typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { @@ -255,7 +266,7 @@ struct grpc_call { grpc_iomgr_closure on_done_bind; /** completion events - for completion queue use */ - grpc_cq_completion completions[6]; + grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) -- cgit v1.2.3 From b4580d4e0e18812bf214b2b92985741dedbd1527 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Jul 2015 10:42:34 -0700 Subject: Added static where needed --- src/core/surface/call.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 1e85b0f082..20c41cbbd7 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -367,7 +367,7 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } -grpc_cq_completion *allocate_completion(grpc_call *call) { +static grpc_cq_completion *allocate_completion(grpc_call *call) { gpr_uint8 i; gpr_mu_lock(&call->completion_mu); for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { @@ -382,7 +382,7 @@ grpc_cq_completion *allocate_completion(grpc_call *call) { abort(); } -void done_completion(void *call, grpc_cq_completion *completion) { +static void done_completion(void *call, grpc_cq_completion *completion) { grpc_call *c = call; gpr_mu_lock(&c->completion_mu); c->allocated_completions &= ~(1u << (completion - c->completions)); -- cgit v1.2.3 From 80e5f044a4f7ee5e1b81a3da8f930d216ca01849 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Sun, 12 Jul 2015 01:28:05 -0700 Subject: zero-out channel after creation --- src/core/surface/channel.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/core/surface') diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index eeae3b507c..b7826d4dfc 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -91,6 +91,7 @@ grpc_channel *grpc_channel_create_from_filters( size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); + memset(channel, 0, sizeof(*channel)); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; /* decremented by grpc_channel_destroy */ -- cgit v1.2.3