diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-11-09 17:46:29 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2017-11-09 17:46:29 -0800 |
commit | 4e9265c828f0b559b5fdba04913fed46bf771399 (patch) | |
tree | 4a379fc2bdc037753cf8d81f8b86327e4bc50a42 /src/core/lib/transport | |
parent | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (diff) | |
parent | d9da7387b8057f3bd99a417a5ee905377bce9296 (diff) |
Merge with master
Diffstat (limited to 'src/core/lib/transport')
24 files changed, 596 insertions, 557 deletions
diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 6fd6597cfe..47d65870d1 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -28,7 +28,7 @@ grpc_tracer_flag grpc_bdp_estimator_trace = namespace grpc_core { -BdpEstimator::BdpEstimator(const char *name) +BdpEstimator::BdpEstimator(const char* name) : ping_state_(PingState::UNSCHEDULED), accumulator_(0), estimate_(65536), @@ -45,8 +45,9 @@ grpc_millis BdpEstimator::CompletePing() { double bw = dt > 0 ? ((double)accumulator_ / dt) : 0; int start_inter_ping_delay = inter_ping_delay_; if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 - " dt=%lf bw=%lfMbs bw_est=%lfMbs", + gpr_log(GPR_DEBUG, + "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 + " dt=%lf bw=%lfMbs bw_est=%lfMbs", name_, accumulator_, estimate_, dt, bw / 125000.0, bw_est_ / 125000.0); } diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index 81c5b0f98b..431d4b845c 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -37,18 +37,11 @@ namespace grpc_core { class BdpEstimator { public: - explicit BdpEstimator(const char *name); + explicit BdpEstimator(const char* name); ~BdpEstimator() {} - // Returns true if a reasonable estimate could be obtained - bool EstimateBdp(int64_t *estimate_out) const { - *estimate_out = estimate_; - return true; - } - bool EstimateBandwidth(double *bw_out) const { - *bw_out = bw_est_; - return true; - } + int64_t EstimateBdp() const { return estimate_; } + double EstimateBandwidth() const { return bw_est_; } void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } @@ -93,7 +86,7 @@ class BdpEstimator { int inter_ping_delay_; int stable_estimate_count_; double bw_est_; - const char *name_; + const char* name_; }; } // namespace grpc_core diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc index 4f32aeeaaa..8dcb1e0bdb 100644 --- a/src/core/lib/transport/byte_stream.cc +++ b/src/core/lib/transport/byte_stream.cc @@ -25,38 +25,38 @@ #include "src/core/lib/slice/slice_internal.h" -bool grpc_byte_stream_next(grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete) { +bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, + grpc_closure* on_complete) { return byte_stream->vtable->next(byte_stream, max_size_hint, on_complete); } -grpc_error *grpc_byte_stream_pull(grpc_byte_stream *byte_stream, - grpc_slice *slice) { +grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream, + grpc_slice* slice) { return byte_stream->vtable->pull(byte_stream, slice); } -void grpc_byte_stream_shutdown(grpc_byte_stream *byte_stream, - grpc_error *error) { +void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream, + grpc_error* error) { byte_stream->vtable->shutdown(byte_stream, error); } -void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream) { +void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream) { byte_stream->vtable->destroy(byte_stream); } // grpc_slice_buffer_stream -static bool slice_buffer_stream_next(grpc_byte_stream *byte_stream, +static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure *on_complete) { - grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + grpc_closure* on_complete) { + grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream; GPR_ASSERT(stream->cursor < stream->backing_buffer->count); return true; } -static grpc_error *slice_buffer_stream_pull(grpc_byte_stream *byte_stream, - grpc_slice *slice) { - grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; +static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream, + grpc_slice* slice) { + grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream; if (stream->shutdown_error != GRPC_ERROR_NONE) { return GRPC_ERROR_REF(stream->shutdown_error); } @@ -67,15 +67,15 @@ static grpc_error *slice_buffer_stream_pull(grpc_byte_stream *byte_stream, return GRPC_ERROR_NONE; } -static void slice_buffer_stream_shutdown(grpc_byte_stream *byte_stream, - grpc_error *error) { - grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; +static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream, + grpc_error* error) { + grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream; GRPC_ERROR_UNREF(stream->shutdown_error); stream->shutdown_error = error; } -static void slice_buffer_stream_destroy(grpc_byte_stream *byte_stream) { - grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; +static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) { + grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream; grpc_slice_buffer_reset_and_unref_internal(stream->backing_buffer); GRPC_ERROR_UNREF(stream->shutdown_error); } @@ -84,8 +84,8 @@ static const grpc_byte_stream_vtable slice_buffer_stream_vtable = { slice_buffer_stream_next, slice_buffer_stream_pull, slice_buffer_stream_shutdown, slice_buffer_stream_destroy}; -void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, - grpc_slice_buffer *slice_buffer, +void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, + grpc_slice_buffer* slice_buffer, uint32_t flags) { GPR_ASSERT(slice_buffer->length <= UINT32_MAX); stream->base.length = (uint32_t)slice_buffer->length; @@ -98,30 +98,30 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, // grpc_caching_byte_stream -void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache, - grpc_byte_stream *underlying_stream) { +void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache, + grpc_byte_stream* underlying_stream) { cache->underlying_stream = underlying_stream; grpc_slice_buffer_init(&cache->cache_buffer); } -void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache *cache) { +void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache) { grpc_byte_stream_destroy(cache->underlying_stream); grpc_slice_buffer_destroy_internal(&cache->cache_buffer); } -static bool caching_byte_stream_next(grpc_byte_stream *byte_stream, +static bool caching_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure *on_complete) { - grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + grpc_closure* on_complete) { + grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream; if (stream->shutdown_error != GRPC_ERROR_NONE) return true; if (stream->cursor < stream->cache->cache_buffer.count) return true; return grpc_byte_stream_next(stream->cache->underlying_stream, max_size_hint, on_complete); } -static grpc_error *caching_byte_stream_pull(grpc_byte_stream *byte_stream, - grpc_slice *slice) { - grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; +static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream, + grpc_slice* slice) { + grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream; if (stream->shutdown_error != GRPC_ERROR_NONE) { return GRPC_ERROR_REF(stream->shutdown_error); } @@ -131,7 +131,7 @@ static grpc_error *caching_byte_stream_pull(grpc_byte_stream *byte_stream, ++stream->cursor; return GRPC_ERROR_NONE; } - grpc_error *error = + grpc_error* error = grpc_byte_stream_pull(stream->cache->underlying_stream, slice); if (error == GRPC_ERROR_NONE) { ++stream->cursor; @@ -141,16 +141,16 @@ static grpc_error *caching_byte_stream_pull(grpc_byte_stream *byte_stream, return error; } -static void caching_byte_stream_shutdown(grpc_byte_stream *byte_stream, - grpc_error *error) { - grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; +static void caching_byte_stream_shutdown(grpc_byte_stream* byte_stream, + grpc_error* error) { + grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream; GRPC_ERROR_UNREF(stream->shutdown_error); stream->shutdown_error = GRPC_ERROR_REF(error); grpc_byte_stream_shutdown(stream->cache->underlying_stream, error); } -static void caching_byte_stream_destroy(grpc_byte_stream *byte_stream) { - grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; +static void caching_byte_stream_destroy(grpc_byte_stream* byte_stream) { + grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream; GRPC_ERROR_UNREF(stream->shutdown_error); } @@ -158,8 +158,8 @@ static const grpc_byte_stream_vtable caching_byte_stream_vtable = { caching_byte_stream_next, caching_byte_stream_pull, caching_byte_stream_shutdown, caching_byte_stream_destroy}; -void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream, - grpc_byte_stream_cache *cache) { +void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream, + grpc_byte_stream_cache* cache) { memset(stream, 0, sizeof(*stream)); stream->base.length = cache->underlying_stream->length; stream->base.flags = cache->underlying_stream->flags; @@ -168,6 +168,6 @@ void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream, stream->shutdown_error = GRPC_ERROR_NONE; } -void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream) { +void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream) { stream->cursor = 0; } diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index a2d57a680c..cec8c91624 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -35,17 +35,17 @@ extern "C" { typedef struct grpc_byte_stream grpc_byte_stream; typedef struct { - bool (*next)(grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete); - grpc_error *(*pull)(grpc_byte_stream *byte_stream, grpc_slice *slice); - void (*shutdown)(grpc_byte_stream *byte_stream, grpc_error *error); - void (*destroy)(grpc_byte_stream *byte_stream); + bool (*next)(grpc_byte_stream* byte_stream, size_t max_size_hint, + grpc_closure* on_complete); + grpc_error* (*pull)(grpc_byte_stream* byte_stream, grpc_slice* slice); + void (*shutdown)(grpc_byte_stream* byte_stream, grpc_error* error); + void (*destroy)(grpc_byte_stream* byte_stream); } grpc_byte_stream_vtable; struct grpc_byte_stream { uint32_t length; uint32_t flags; - const grpc_byte_stream_vtable *vtable; + const grpc_byte_stream_vtable* vtable; }; // Returns true if the bytes are available immediately (in which case @@ -54,16 +54,16 @@ struct grpc_byte_stream { // // max_size_hint can be set as a hint as to the maximum number // of bytes that would be acceptable to read. -bool grpc_byte_stream_next(grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete); +bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, + grpc_closure* on_complete); // Returns the next slice in the byte stream when it is ready (indicated by // either grpc_byte_stream_next returning true or on_complete passed to // grpc_byte_stream_next is called). // // Once a slice is returned into *slice, it is owned by the caller. -grpc_error *grpc_byte_stream_pull(grpc_byte_stream *byte_stream, - grpc_slice *slice); +grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream, + grpc_slice* slice); // Shuts down the byte stream. // @@ -72,10 +72,10 @@ grpc_error *grpc_byte_stream_pull(grpc_byte_stream *byte_stream, // // The next call to grpc_byte_stream_pull() (if any) will return the error // passed to grpc_byte_stream_shutdown(). -void grpc_byte_stream_shutdown(grpc_byte_stream *byte_stream, - grpc_error *error); +void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream, + grpc_error* error); -void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream); +void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream); // grpc_slice_buffer_stream // @@ -85,13 +85,13 @@ void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream); typedef struct grpc_slice_buffer_stream { grpc_byte_stream base; - grpc_slice_buffer *backing_buffer; + grpc_slice_buffer* backing_buffer; size_t cursor; - grpc_error *shutdown_error; + grpc_error* shutdown_error; } grpc_slice_buffer_stream; -void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, - grpc_slice_buffer *slice_buffer, +void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, + grpc_slice_buffer* slice_buffer, uint32_t flags); // grpc_caching_byte_stream @@ -108,29 +108,29 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, // grpc_byte_stream_cache at the same time. typedef struct { - grpc_byte_stream *underlying_stream; + grpc_byte_stream* underlying_stream; grpc_slice_buffer cache_buffer; } grpc_byte_stream_cache; // Takes ownership of underlying_stream. -void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache, - grpc_byte_stream *underlying_stream); +void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache, + grpc_byte_stream* underlying_stream); // Must not be called while still in use by a grpc_caching_byte_stream. -void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache *cache); +void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache); typedef struct { grpc_byte_stream base; - grpc_byte_stream_cache *cache; + grpc_byte_stream_cache* cache; size_t cursor; - grpc_error *shutdown_error; + grpc_error* shutdown_error; } grpc_caching_byte_stream; -void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream, - grpc_byte_stream_cache *cache); +void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream, + grpc_byte_stream_cache* cache); // Resets the byte stream to the start of the underlying stream. -void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream); +void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream); #ifdef __cplusplus } diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 8775eed767..0da9165b8d 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -27,10 +27,8 @@ grpc_tracer_flag grpc_connectivity_state_trace = GRPC_TRACER_INITIALIZER(false, "connectivity_state"); -const char *grpc_connectivity_state_name(grpc_connectivity_state state) { +const char* grpc_connectivity_state_name(grpc_connectivity_state state) { switch (state) { - case GRPC_CHANNEL_INIT: - return "INIT"; case GRPC_CHANNEL_IDLE: return "IDLE"; case GRPC_CHANNEL_CONNECTING: @@ -45,18 +43,18 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, +void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker, grpc_connectivity_state init_state, - const char *name) { + const char* name) { gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state); tracker->current_error = GRPC_ERROR_NONE; tracker->watchers = NULL; tracker->name = gpr_strdup(name); } -void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { - grpc_error *error; - grpc_connectivity_state_watcher *w; +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) { + grpc_error* error; + grpc_connectivity_state_watcher* w; while ((w = tracker->watchers)) { tracker->watchers = w->next; @@ -75,7 +73,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { } grpc_connectivity_state grpc_connectivity_state_check( - grpc_connectivity_state_tracker *tracker) { + grpc_connectivity_state_tracker* tracker) { grpc_connectivity_state cur = (grpc_connectivity_state)gpr_atm_no_barrier_load( &tracker->current_state_atm); @@ -87,7 +85,7 @@ grpc_connectivity_state grpc_connectivity_state_check( } grpc_connectivity_state grpc_connectivity_state_get( - grpc_connectivity_state_tracker *tracker, grpc_error **error) { + grpc_connectivity_state_tracker* tracker, grpc_error** error) { grpc_connectivity_state cur = (grpc_connectivity_state)gpr_atm_no_barrier_load( &tracker->current_state_atm); @@ -102,13 +100,13 @@ grpc_connectivity_state grpc_connectivity_state_get( } bool grpc_connectivity_state_has_watchers( - grpc_connectivity_state_tracker *connectivity_state) { + grpc_connectivity_state_tracker* connectivity_state) { return connectivity_state->watchers != NULL; } bool grpc_connectivity_state_notify_on_state_change( - grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_closure *notify) { + grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current, + grpc_closure* notify) { grpc_connectivity_state cur = (grpc_connectivity_state)gpr_atm_no_barrier_load( &tracker->current_state_atm); @@ -123,7 +121,7 @@ bool grpc_connectivity_state_notify_on_state_change( } } if (current == NULL) { - grpc_connectivity_state_watcher *w = tracker->watchers; + grpc_connectivity_state_watcher* w = tracker->watchers; if (w != NULL && w->notify == notify) { GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED); tracker->watchers = w->next; @@ -131,7 +129,7 @@ bool grpc_connectivity_state_notify_on_state_change( return false; } while (w != NULL) { - grpc_connectivity_state_watcher *rm_candidate = w->next; + grpc_connectivity_state_watcher* rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED); w->next = w->next->next; @@ -146,8 +144,8 @@ bool grpc_connectivity_state_notify_on_state_change( *current = cur; GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_REF(tracker->current_error)); } else { - grpc_connectivity_state_watcher *w = - (grpc_connectivity_state_watcher *)gpr_malloc(sizeof(*w)); + grpc_connectivity_state_watcher* w = + (grpc_connectivity_state_watcher*)gpr_malloc(sizeof(*w)); w->current = current; w->notify = notify; w->next = tracker->watchers; @@ -157,21 +155,20 @@ bool grpc_connectivity_state_notify_on_state_change( } } -void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, +void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, grpc_connectivity_state state, - grpc_error *error, const char *reason) { + grpc_error* error, const char* reason) { grpc_connectivity_state cur = (grpc_connectivity_state)gpr_atm_no_barrier_load( &tracker->current_state_atm); - grpc_connectivity_state_watcher *w; + grpc_connectivity_state_watcher* w; if (GRPC_TRACER_ON(grpc_connectivity_state_trace)) { - const char *error_string = grpc_error_string(error); + const char* error_string = grpc_error_string(error); gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, tracker->name, grpc_connectivity_state_name(cur), grpc_connectivity_state_name(state), reason, error, error_string); } switch (state) { - case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_READY: diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index b2dece3478..83ce99e6d6 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -29,62 +29,62 @@ extern "C" { typedef struct grpc_connectivity_state_watcher { /** we keep watchers in a linked list */ - struct grpc_connectivity_state_watcher *next; + struct grpc_connectivity_state_watcher* next; /** closure to notify on change */ - grpc_closure *notify; + grpc_closure* notify; /** the current state as believed by the watcher */ - grpc_connectivity_state *current; + grpc_connectivity_state* current; } grpc_connectivity_state_watcher; typedef struct { /** current grpc_connectivity_state */ gpr_atm current_state_atm; /** error associated with state */ - grpc_error *current_error; + grpc_error* current_error; /** all our watchers */ - grpc_connectivity_state_watcher *watchers; + grpc_connectivity_state_watcher* watchers; /** a name to help debugging */ - char *name; + char* name; } grpc_connectivity_state_tracker; extern grpc_tracer_flag grpc_connectivity_state_trace; /** enum --> string conversion */ -const char *grpc_connectivity_state_name(grpc_connectivity_state state); +const char* grpc_connectivity_state_name(grpc_connectivity_state state); -void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, +void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker, grpc_connectivity_state init_state, - const char *name); -void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); + const char* name); +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker); /** Set connectivity state; not thread safe; access must be serialized with an * external lock */ -void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, +void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, grpc_connectivity_state state, - grpc_error *associated_error, - const char *reason); + grpc_error* associated_error, + const char* reason); /** Return true if this connectivity state has watchers. Access must be serialized with an external lock. */ bool grpc_connectivity_state_has_watchers( - grpc_connectivity_state_tracker *tracker); + grpc_connectivity_state_tracker* tracker); /** Return the last seen connectivity state. No need to synchronize access. */ grpc_connectivity_state grpc_connectivity_state_check( - grpc_connectivity_state_tracker *tracker); + grpc_connectivity_state_tracker* tracker); /** Return the last seen connectivity state, and the associated error. Access must be serialized with an external lock. */ grpc_connectivity_state grpc_connectivity_state_get( - grpc_connectivity_state_tracker *tracker, grpc_error **error); + grpc_connectivity_state_tracker* tracker, grpc_error** error); /** Return 1 if the channel should start connecting, 0 otherwise. If current==NULL cancel notify if it is already queued (success==0 in that case). Access must be serialized with an external lock. */ bool grpc_connectivity_state_notify_on_state_change( - grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_closure *notify); + grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current, + grpc_closure* notify); #ifdef __cplusplus } diff --git a/src/core/lib/transport/error_utils.cc b/src/core/lib/transport/error_utils.cc index 276fdf1cdc..eb7cf192d8 100644 --- a/src/core/lib/transport/error_utils.cc +++ b/src/core/lib/transport/error_utils.cc @@ -21,7 +21,7 @@ #include "src/core/lib/iomgr/error_internal.h" #include "src/core/lib/transport/status_conversion.h" -static grpc_error *recursively_find_error_with_field(grpc_error *error, +static grpc_error* recursively_find_error_with_field(grpc_error* error, grpc_error_ints which) { // If the error itself has a status code, return it. if (grpc_error_get_int(error, which, NULL)) { @@ -31,20 +31,20 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error, // Otherwise, search through its children. uint8_t slot = error->first_err; while (slot != UINT8_MAX) { - grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot); - grpc_error *result = recursively_find_error_with_field(lerr->err, which); + grpc_linked_error* lerr = (grpc_linked_error*)(error->arena + slot); + grpc_error* result = recursively_find_error_with_field(lerr->err, which); if (result) return result; slot = lerr->next; } return NULL; } -void grpc_error_get_status(grpc_error *error, grpc_millis deadline, - grpc_status_code *code, grpc_slice *slice, - grpc_http2_error_code *http_error) { +void grpc_error_get_status(grpc_error* error, grpc_millis deadline, + grpc_status_code* code, grpc_slice* slice, + grpc_http2_error_code* http_error) { // Start with the parent error and recurse through the tree of children // until we find the first one that has a status code. - grpc_error *found_error = + grpc_error* found_error = recursively_find_error_with_field(error, GRPC_ERROR_INT_GRPC_STATUS); if (found_error == NULL) { /// If no grpc-status exists, retry through the tree to find a http2 error @@ -93,13 +93,13 @@ void grpc_error_get_status(grpc_error *error, grpc_millis deadline, if (found_error == NULL) found_error = error; } -bool grpc_error_has_clear_grpc_status(grpc_error *error) { +bool grpc_error_has_clear_grpc_status(grpc_error* error) { if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { return true; } uint8_t slot = error->first_err; while (slot != UINT8_MAX) { - grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot); + grpc_linked_error* lerr = (grpc_linked_error*)(error->arena + slot); if (grpc_error_has_clear_grpc_status(lerr->err)) { return true; } diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h index 94012450f7..73ca7cf3ab 100644 --- a/src/core/lib/transport/error_utils.h +++ b/src/core/lib/transport/error_utils.h @@ -33,15 +33,15 @@ extern "C" { /// All attributes are pulled from the same child error. If any of the /// attributes (code, msg, http_status) are unneeded, they can be passed as /// NULL. -void grpc_error_get_status(grpc_error *error, grpc_millis deadline, - grpc_status_code *code, grpc_slice *slice, - grpc_http2_error_code *http_status); +void grpc_error_get_status(grpc_error* error, grpc_millis deadline, + grpc_status_code* code, grpc_slice* slice, + grpc_http2_error_code* http_status); /// A utility function to check whether there is a clear status code that /// doesn't need to be guessed in \a error. This means that \a error or some /// child has GRPC_ERROR_INT_GRPC_STATUS set, or that it is GRPC_ERROR_NONE or /// GRPC_ERROR_CANCELLED -bool grpc_error_has_clear_grpc_status(grpc_error *error); +bool grpc_error_has_clear_grpc_status(grpc_error* error); #ifdef __cplusplus } diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc index 9f88917daa..1522e789fc 100644 --- a/src/core/lib/transport/metadata.cc +++ b/src/core/lib/transport/metadata.cc @@ -67,7 +67,7 @@ grpc_tracer_flag grpc_trace_metadata = #define TABLE_IDX(hash, capacity) (((hash) >> (LOG2_SHARD_COUNT)) % (capacity)) #define SHARD_IDX(hash) ((hash) & ((1 << (LOG2_SHARD_COUNT)) - 1)) -typedef void (*destroy_user_data_func)(void *user_data); +typedef void (*destroy_user_data_func)(void* user_data); /* Shadow structure for grpc_mdelem_data for interned elements */ typedef struct interned_metadata { @@ -82,7 +82,7 @@ typedef struct interned_metadata { gpr_atm destroy_user_data; gpr_atm user_data; - struct interned_metadata *bucket_next; + struct interned_metadata* bucket_next; } interned_metadata; /* Shadow structure for grpc_mdelem_data for allocated elements */ @@ -97,7 +97,7 @@ typedef struct allocated_metadata { typedef struct mdtab_shard { gpr_mu mu; - interned_metadata **elems; + interned_metadata** elems; size_t count; size_t capacity; /** Estimate of the number of unreferenced mdelems in the hash table. @@ -108,24 +108,24 @@ typedef struct mdtab_shard { static mdtab_shard g_shards[SHARD_COUNT]; -static void gc_mdtab(mdtab_shard *shard); +static void gc_mdtab(mdtab_shard* shard); void grpc_mdctx_global_init(void) { /* initialize shards */ for (size_t i = 0; i < SHARD_COUNT; i++) { - mdtab_shard *shard = &g_shards[i]; + mdtab_shard* shard = &g_shards[i]; gpr_mu_init(&shard->mu); shard->count = 0; gpr_atm_no_barrier_store(&shard->free_estimate, 0); shard->capacity = INITIAL_SHARD_CAPACITY; - shard->elems = (interned_metadata **)gpr_zalloc(sizeof(*shard->elems) * - shard->capacity); + shard->elems = (interned_metadata**)gpr_zalloc(sizeof(*shard->elems) * + shard->capacity); } } void grpc_mdctx_global_shutdown() { for (size_t i = 0; i < SHARD_COUNT; i++) { - mdtab_shard *shard = &g_shards[i]; + mdtab_shard* shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); gc_mdtab(shard); /* TODO(ctiller): GPR_ASSERT(shard->count == 0); */ @@ -146,14 +146,14 @@ static int is_mdelem_static(grpc_mdelem e) { &grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT]; } -static void ref_md_locked(mdtab_shard *shard, - interned_metadata *md DEBUG_ARGS) { +static void ref_md_locked(mdtab_shard* shard, + interned_metadata* md DEBUG_ARGS) { #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_metadata)) { - char *key_str = grpc_slice_to_c_string(md->key); - char *value_str = grpc_slice_to_c_string(md->value); + char* key_str = grpc_slice_to_c_string(md->key); + char* value_str = grpc_slice_to_c_string(md->value); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", (void *)md, + "ELM REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", (void*)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) + 1, key_str, value_str); gpr_free(key_str); @@ -165,9 +165,9 @@ static void ref_md_locked(mdtab_shard *shard, } } -static void gc_mdtab(mdtab_shard *shard) { +static void gc_mdtab(mdtab_shard* shard) { size_t i; - interned_metadata **prev_next; + interned_metadata** prev_next; interned_metadata *md, *next; gpr_atm num_freed = 0; @@ -175,7 +175,7 @@ static void gc_mdtab(mdtab_shard *shard) { for (i = 0; i < shard->capacity; i++) { prev_next = &shard->elems[i]; for (md = shard->elems[i]; md; md = next) { - void *user_data = (void *)gpr_atm_no_barrier_load(&md->user_data); + void* user_data = (void*)gpr_atm_no_barrier_load(&md->user_data); next = md->bucket_next; if (gpr_atm_acq_load(&md->refcnt) == 0) { grpc_slice_unref_internal(md->key); @@ -197,17 +197,17 @@ static void gc_mdtab(mdtab_shard *shard) { GPR_TIMER_END("gc_mdtab", 0); } -static void grow_mdtab(mdtab_shard *shard) { +static void grow_mdtab(mdtab_shard* shard) { size_t capacity = shard->capacity * 2; size_t i; - interned_metadata **mdtab; + interned_metadata** mdtab; interned_metadata *md, *next; uint32_t hash; GPR_TIMER_BEGIN("grow_mdtab", 0); mdtab = - (interned_metadata **)gpr_zalloc(sizeof(interned_metadata *) * capacity); + (interned_metadata**)gpr_zalloc(sizeof(interned_metadata*) * capacity); for (i = 0; i < shard->capacity; i++) { for (md = shard->elems[i]; md; md = next) { @@ -228,7 +228,7 @@ static void grow_mdtab(mdtab_shard *shard) { GPR_TIMER_END("grow_mdtab", 0); } -static void rehash_mdtab(mdtab_shard *shard) { +static void rehash_mdtab(mdtab_shard* shard) { if (gpr_atm_no_barrier_load(&shard->free_estimate) > (gpr_atm)(shard->capacity / 4)) { gc_mdtab(shard); @@ -239,24 +239,24 @@ static void rehash_mdtab(mdtab_shard *shard) { grpc_mdelem grpc_mdelem_create( grpc_slice key, grpc_slice value, - grpc_mdelem_data *compatible_external_backing_store) { + grpc_mdelem_data* compatible_external_backing_store) { if (!grpc_slice_is_interned(key) || !grpc_slice_is_interned(value)) { if (compatible_external_backing_store != NULL) { return GRPC_MAKE_MDELEM(compatible_external_backing_store, GRPC_MDELEM_STORAGE_EXTERNAL); } - allocated_metadata *allocated = - (allocated_metadata *)gpr_malloc(sizeof(*allocated)); + allocated_metadata* allocated = + (allocated_metadata*)gpr_malloc(sizeof(*allocated)); allocated->key = grpc_slice_ref_internal(key); allocated->value = grpc_slice_ref_internal(value); gpr_atm_rel_store(&allocated->refcnt, 1); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_metadata)) { - char *key_str = grpc_slice_to_c_string(allocated->key); - char *value_str = grpc_slice_to_c_string(allocated->value); + char* key_str = grpc_slice_to_c_string(allocated->key); + char* value_str = grpc_slice_to_c_string(allocated->value); gpr_log(GPR_DEBUG, "ELM ALLOC:%p:%" PRIdPTR ": '%s' = '%s'", - (void *)allocated, gpr_atm_no_barrier_load(&allocated->refcnt), + (void*)allocated, gpr_atm_no_barrier_load(&allocated->refcnt), key_str, value_str); gpr_free(key_str); gpr_free(value_str); @@ -276,8 +276,8 @@ grpc_mdelem grpc_mdelem_create( uint32_t hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(key), grpc_slice_hash(value)); - interned_metadata *md; - mdtab_shard *shard = &g_shards[SHARD_IDX(hash)]; + interned_metadata* md; + mdtab_shard* shard = &g_shards[SHARD_IDX(hash)]; size_t idx; GPR_TIMER_BEGIN("grpc_mdelem_from_metadata_strings", 0); @@ -296,7 +296,7 @@ grpc_mdelem grpc_mdelem_create( } /* not found: create a new pair */ - md = (interned_metadata *)gpr_malloc(sizeof(interned_metadata)); + md = (interned_metadata*)gpr_malloc(sizeof(interned_metadata)); gpr_atm_rel_store(&md->refcnt, 1); md->key = grpc_slice_ref_internal(key); md->value = grpc_slice_ref_internal(value); @@ -307,9 +307,9 @@ grpc_mdelem grpc_mdelem_create( gpr_mu_init(&md->mu_user_data); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_metadata)) { - char *key_str = grpc_slice_to_c_string(md->key); - char *value_str = grpc_slice_to_c_string(md->value); - gpr_log(GPR_DEBUG, "ELM NEW:%p:%" PRIdPTR ": '%s' = '%s'", (void *)md, + char* key_str = grpc_slice_to_c_string(md->key); + char* value_str = grpc_slice_to_c_string(md->value); + gpr_log(GPR_DEBUG, "ELM NEW:%p:%" PRIdPTR ": '%s' = '%s'", (void*)md, gpr_atm_no_barrier_load(&md->refcnt), key_str, value_str); gpr_free(key_str); gpr_free(value_str); @@ -335,14 +335,14 @@ grpc_mdelem grpc_mdelem_from_slices(grpc_slice key, grpc_slice value) { return out; } -grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata *metadata) { +grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata* metadata) { bool changed = false; grpc_slice key_slice = grpc_slice_maybe_static_intern(metadata->key, &changed); grpc_slice value_slice = grpc_slice_maybe_static_intern(metadata->value, &changed); return grpc_mdelem_create(key_slice, value_slice, - changed ? NULL : (grpc_mdelem_data *)metadata); + changed ? NULL : (grpc_mdelem_data*)metadata); } static size_t get_base64_encoded_size(size_t raw_length) { @@ -350,11 +350,14 @@ static size_t get_base64_encoded_size(size_t raw_length) { return raw_length / 3 * 4 + tail_xtra[raw_length % 3]; } -size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem) { +size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata) { size_t overhead_and_key = 32 + GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); size_t value_len = GRPC_SLICE_LENGTH(GRPC_MDVALUE(elem)); if (grpc_is_binary_header(GRPC_MDKEY(elem))) { - return overhead_and_key + get_base64_encoded_size(value_len); + return overhead_and_key + (use_true_binary_metadata + ? value_len + 1 + : get_base64_encoded_size(value_len)); } else { return overhead_and_key + value_len; } @@ -366,14 +369,14 @@ grpc_mdelem grpc_mdelem_ref(grpc_mdelem gmd DEBUG_ARGS) { case GRPC_MDELEM_STORAGE_STATIC: break; case GRPC_MDELEM_STORAGE_INTERNED: { - interned_metadata *md = (interned_metadata *)GRPC_MDELEM_DATA(gmd); + interned_metadata* md = (interned_metadata*)GRPC_MDELEM_DATA(gmd); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_metadata)) { - char *key_str = grpc_slice_to_c_string(md->key); - char *value_str = grpc_slice_to_c_string(md->value); + char* key_str = grpc_slice_to_c_string(md->key); + char* value_str = grpc_slice_to_c_string(md->value); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "ELM REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", - (void *)md, gpr_atm_no_barrier_load(&md->refcnt), + (void*)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) + 1, key_str, value_str); gpr_free(key_str); gpr_free(value_str); @@ -388,14 +391,14 @@ grpc_mdelem grpc_mdelem_ref(grpc_mdelem gmd DEBUG_ARGS) { break; } case GRPC_MDELEM_STORAGE_ALLOCATED: { - allocated_metadata *md = (allocated_metadata *)GRPC_MDELEM_DATA(gmd); + allocated_metadata* md = (allocated_metadata*)GRPC_MDELEM_DATA(gmd); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_metadata)) { - char *key_str = grpc_slice_to_c_string(md->key); - char *value_str = grpc_slice_to_c_string(md->value); + char* key_str = grpc_slice_to_c_string(md->key); + char* value_str = grpc_slice_to_c_string(md->value); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "ELM REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", - (void *)md, gpr_atm_no_barrier_load(&md->refcnt), + (void*)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) + 1, key_str, value_str); gpr_free(key_str); gpr_free(value_str); @@ -418,14 +421,14 @@ void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) { case GRPC_MDELEM_STORAGE_STATIC: break; case GRPC_MDELEM_STORAGE_INTERNED: { - interned_metadata *md = (interned_metadata *)GRPC_MDELEM_DATA(gmd); + interned_metadata* md = (interned_metadata*)GRPC_MDELEM_DATA(gmd); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_metadata)) { - char *key_str = grpc_slice_to_c_string(md->key); - char *value_str = grpc_slice_to_c_string(md->value); + char* key_str = grpc_slice_to_c_string(md->key); + char* value_str = grpc_slice_to_c_string(md->value); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "ELM UNREF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", - (void *)md, gpr_atm_no_barrier_load(&md->refcnt), + (void*)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) - 1, key_str, value_str); gpr_free(key_str); gpr_free(value_str); @@ -438,20 +441,20 @@ void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) { if (1 == prev_refcount) { /* once the refcount hits zero, some other thread can come along and free md at any time: it's unsafe from this point on to access it */ - mdtab_shard *shard = &g_shards[SHARD_IDX(hash)]; + mdtab_shard* shard = &g_shards[SHARD_IDX(hash)]; gpr_atm_no_barrier_fetch_add(&shard->free_estimate, 1); } break; } case GRPC_MDELEM_STORAGE_ALLOCATED: { - allocated_metadata *md = (allocated_metadata *)GRPC_MDELEM_DATA(gmd); + allocated_metadata* md = (allocated_metadata*)GRPC_MDELEM_DATA(gmd); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_metadata)) { - char *key_str = grpc_slice_to_c_string(md->key); - char *value_str = grpc_slice_to_c_string(md->value); + char* key_str = grpc_slice_to_c_string(md->key); + char* value_str = grpc_slice_to_c_string(md->value); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "ELM UNREF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", - (void *)md, gpr_atm_no_barrier_load(&md->refcnt), + (void*)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) - 1, key_str, value_str); gpr_free(key_str); gpr_free(value_str); @@ -469,19 +472,19 @@ void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) { } } -void *grpc_mdelem_get_user_data(grpc_mdelem md, void (*destroy_func)(void *)) { +void* grpc_mdelem_get_user_data(grpc_mdelem md, void (*destroy_func)(void*)) { switch (GRPC_MDELEM_STORAGE(md)) { case GRPC_MDELEM_STORAGE_EXTERNAL: case GRPC_MDELEM_STORAGE_ALLOCATED: return NULL; case GRPC_MDELEM_STORAGE_STATIC: - return (void *)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) - - grpc_static_mdelem_table]; + return (void*)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) - + grpc_static_mdelem_table]; case GRPC_MDELEM_STORAGE_INTERNED: { - interned_metadata *im = (interned_metadata *)GRPC_MDELEM_DATA(md); - void *result; + interned_metadata* im = (interned_metadata*)GRPC_MDELEM_DATA(md); + void* result; if (gpr_atm_acq_load(&im->destroy_user_data) == (gpr_atm)destroy_func) { - return (void *)gpr_atm_no_barrier_load(&im->user_data); + return (void*)gpr_atm_no_barrier_load(&im->user_data); } else { return NULL; } @@ -491,8 +494,8 @@ void *grpc_mdelem_get_user_data(grpc_mdelem md, void (*destroy_func)(void *)) { GPR_UNREACHABLE_CODE(return NULL); } -void *grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void *), - void *user_data) { +void* grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void*), + void* user_data) { switch (GRPC_MDELEM_STORAGE(md)) { case GRPC_MDELEM_STORAGE_EXTERNAL: case GRPC_MDELEM_STORAGE_ALLOCATED: @@ -500,10 +503,10 @@ void *grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void *), return NULL; case GRPC_MDELEM_STORAGE_STATIC: destroy_func(user_data); - return (void *)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) - - grpc_static_mdelem_table]; + return (void*)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) - + grpc_static_mdelem_table]; case GRPC_MDELEM_STORAGE_INTERNED: { - interned_metadata *im = (interned_metadata *)GRPC_MDELEM_DATA(md); + interned_metadata* im = (interned_metadata*)GRPC_MDELEM_DATA(md); GPR_ASSERT(!is_mdelem_static(md)); GPR_ASSERT((user_data == NULL) == (destroy_func == NULL)); gpr_mu_lock(&im->mu_user_data); @@ -513,7 +516,7 @@ void *grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void *), if (destroy_func != NULL) { destroy_func(user_data); } - return (void *)gpr_atm_no_barrier_load(&im->user_data); + return (void*)gpr_atm_no_barrier_load(&im->user_data); } gpr_atm_no_barrier_store(&im->user_data, (gpr_atm)user_data); gpr_atm_rel_store(&im->destroy_user_data, (gpr_atm)destroy_func); diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index d0465d7215..552eabf9b7 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -98,8 +98,7 @@ struct grpc_mdelem { uintptr_t payload; }; -#define GRPC_MDELEM_DATA(md) \ - ((grpc_mdelem_data *)((md).payload & ~(uintptr_t)3)) +#define GRPC_MDELEM_DATA(md) ((grpc_mdelem_data*)((md).payload & ~(uintptr_t)3)) #define GRPC_MDELEM_STORAGE(md) \ ((grpc_mdelem_data_storage)((md).payload & (uintptr_t)3)) #ifdef __cplusplus @@ -118,7 +117,7 @@ grpc_mdelem grpc_mdelem_from_slices(grpc_slice key, grpc_slice value); /* Cheaply convert a grpc_metadata to a grpc_mdelem; may use the grpc_metadata object as backing storage (so lifetimes should align) */ -grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata *metadata); +grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata* metadata); /* Does not unref the slices; if a new non-interned mdelem is needed, allocates one if compatible_external_backing_store is NULL, or uses @@ -126,24 +125,24 @@ grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata *metadata); users responsibility to ensure that it outlives usage) */ grpc_mdelem grpc_mdelem_create( grpc_slice key, grpc_slice value, - grpc_mdelem_data *compatible_external_backing_store); + grpc_mdelem_data* compatible_external_backing_store); bool grpc_mdelem_eq(grpc_mdelem a, grpc_mdelem b); -size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem); +size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata); /* Mutator and accessor for grpc_mdelem user data. The destructor function is used as a type tag and is checked during user_data fetch. */ -void *grpc_mdelem_get_user_data(grpc_mdelem md, - void (*if_destroy_func)(void *)); -void *grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void *), - void *user_data); +void* grpc_mdelem_get_user_data(grpc_mdelem md, void (*if_destroy_func)(void*)); +void* grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void*), + void* user_data); #ifndef NDEBUG #define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s), __FILE__, __LINE__) #define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s), __FILE__, __LINE__) -grpc_mdelem grpc_mdelem_ref(grpc_mdelem md, const char *file, int line); -void grpc_mdelem_unref(grpc_mdelem md, const char *file, int line); +grpc_mdelem grpc_mdelem_ref(grpc_mdelem md, const char* file, int line); +void grpc_mdelem_unref(grpc_mdelem md, const char* file, int line); #else #define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s)) #define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s)) diff --git a/src/core/lib/transport/metadata_batch.cc b/src/core/lib/transport/metadata_batch.cc index 075a03c0f9..85f1409a1e 100644 --- a/src/core/lib/transport/metadata_batch.cc +++ b/src/core/lib/transport/metadata_batch.cc @@ -28,9 +28,9 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -static void assert_valid_list(grpc_mdelem_list *list) { +static void assert_valid_list(grpc_mdelem_list* list) { #ifndef NDEBUG - grpc_linked_mdelem *l; + grpc_linked_mdelem* l; GPR_ASSERT((list->head == NULL) == (list->tail == NULL)); if (!list->head) return; @@ -51,9 +51,9 @@ static void assert_valid_list(grpc_mdelem_list *list) { #endif /* NDEBUG */ } -static void assert_valid_callouts(grpc_metadata_batch *batch) { +static void assert_valid_callouts(grpc_metadata_batch* batch) { #ifndef NDEBUG - for (grpc_linked_mdelem *l = batch->list.head; l != NULL; l = l->next) { + for (grpc_linked_mdelem* l = batch->list.head; l != NULL; l = l->next) { grpc_slice key_interned = grpc_slice_intern(GRPC_MDKEY(l->md)); grpc_metadata_batch_callouts_index callout_idx = GRPC_BATCH_INDEX_OF(key_interned); @@ -66,37 +66,37 @@ static void assert_valid_callouts(grpc_metadata_batch *batch) { } #ifndef NDEBUG -void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) { +void grpc_metadata_batch_assert_ok(grpc_metadata_batch* batch) { assert_valid_list(&batch->list); } #endif /* NDEBUG */ -void grpc_metadata_batch_init(grpc_metadata_batch *batch) { +void grpc_metadata_batch_init(grpc_metadata_batch* batch) { memset(batch, 0, sizeof(*batch)); batch->deadline = GRPC_MILLIS_INF_FUTURE; } -void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) { - grpc_linked_mdelem *l; +void grpc_metadata_batch_destroy(grpc_metadata_batch* batch) { + grpc_linked_mdelem* l; for (l = batch->list.head; l; l = l->next) { GRPC_MDELEM_UNREF(l->md); } } -grpc_error *grpc_attach_md_to_error(grpc_error *src, grpc_mdelem md) { - grpc_error *out = grpc_error_set_str( +grpc_error* grpc_attach_md_to_error(grpc_error* src, grpc_mdelem md) { + grpc_error* out = grpc_error_set_str( grpc_error_set_str(src, GRPC_ERROR_STR_KEY, grpc_slice_ref_internal(GRPC_MDKEY(md))), GRPC_ERROR_STR_VALUE, grpc_slice_ref_internal(GRPC_MDVALUE(md))); return out; } -static grpc_error *maybe_link_callout(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) +static grpc_error* maybe_link_callout(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) GRPC_MUST_USE_RESULT; -static grpc_error *maybe_link_callout(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) { +static grpc_error* maybe_link_callout(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) { grpc_metadata_batch_callouts_index idx = GRPC_BATCH_INDEX_OF(GRPC_MDKEY(storage->md)); if (idx == GRPC_BATCH_CALLOUTS_COUNT) { @@ -112,8 +112,8 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch, storage->md); } -static void maybe_unlink_callout(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) { +static void maybe_unlink_callout(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) { grpc_metadata_batch_callouts_index idx = GRPC_BATCH_INDEX_OF(GRPC_MDKEY(storage->md)); if (idx == GRPC_BATCH_CALLOUTS_COUNT) { @@ -124,15 +124,15 @@ static void maybe_unlink_callout(grpc_metadata_batch *batch, batch->idx.array[idx] = NULL; } -grpc_error *grpc_metadata_batch_add_head(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage, +grpc_error* grpc_metadata_batch_add_head(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage, grpc_mdelem elem_to_add) { GPR_ASSERT(!GRPC_MDISNULL(elem_to_add)); storage->md = elem_to_add; return grpc_metadata_batch_link_head(batch, storage); } -static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { +static void link_head(grpc_mdelem_list* list, grpc_linked_mdelem* storage) { assert_valid_list(list); GPR_ASSERT(!GRPC_MDISNULL(storage->md)); storage->prev = NULL; @@ -147,10 +147,10 @@ static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { assert_valid_list(list); } -grpc_error *grpc_metadata_batch_link_head(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) { +grpc_error* grpc_metadata_batch_link_head(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) { assert_valid_callouts(batch); - grpc_error *err = maybe_link_callout(batch, storage); + grpc_error* err = maybe_link_callout(batch, storage); if (err != GRPC_ERROR_NONE) { assert_valid_callouts(batch); return err; @@ -160,15 +160,15 @@ grpc_error *grpc_metadata_batch_link_head(grpc_metadata_batch *batch, return GRPC_ERROR_NONE; } -grpc_error *grpc_metadata_batch_add_tail(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage, +grpc_error* grpc_metadata_batch_add_tail(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage, grpc_mdelem elem_to_add) { GPR_ASSERT(!GRPC_MDISNULL(elem_to_add)); storage->md = elem_to_add; return grpc_metadata_batch_link_tail(batch, storage); } -static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { +static void link_tail(grpc_mdelem_list* list, grpc_linked_mdelem* storage) { assert_valid_list(list); GPR_ASSERT(!GRPC_MDISNULL(storage->md)); storage->prev = list->tail; @@ -184,10 +184,10 @@ static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { assert_valid_list(list); } -grpc_error *grpc_metadata_batch_link_tail(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) { +grpc_error* grpc_metadata_batch_link_tail(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) { assert_valid_callouts(batch); - grpc_error *err = maybe_link_callout(batch, storage); + grpc_error* err = maybe_link_callout(batch, storage); if (err != GRPC_ERROR_NONE) { assert_valid_callouts(batch); return err; @@ -197,8 +197,8 @@ grpc_error *grpc_metadata_batch_link_tail(grpc_metadata_batch *batch, return GRPC_ERROR_NONE; } -static void unlink_storage(grpc_mdelem_list *list, - grpc_linked_mdelem *storage) { +static void unlink_storage(grpc_mdelem_list* list, + grpc_linked_mdelem* storage) { assert_valid_list(list); if (storage->prev != NULL) { storage->prev->next = storage->next; @@ -214,8 +214,8 @@ static void unlink_storage(grpc_mdelem_list *list, assert_valid_list(list); } -void grpc_metadata_batch_remove(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) { +void grpc_metadata_batch_remove(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) { assert_valid_callouts(batch); maybe_unlink_callout(batch, storage); unlink_storage(&batch->list, storage); @@ -223,7 +223,7 @@ void grpc_metadata_batch_remove(grpc_metadata_batch *batch, assert_valid_callouts(batch); } -void grpc_metadata_batch_set_value(grpc_linked_mdelem *storage, +void grpc_metadata_batch_set_value(grpc_linked_mdelem* storage, grpc_slice value) { grpc_mdelem old_mdelem = storage->md; grpc_mdelem new_mdelem = grpc_mdelem_from_slices( @@ -232,11 +232,11 @@ void grpc_metadata_batch_set_value(grpc_linked_mdelem *storage, GRPC_MDELEM_UNREF(old_mdelem); } -grpc_error *grpc_metadata_batch_substitute(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage, +grpc_error* grpc_metadata_batch_substitute(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage, grpc_mdelem new_mdelem) { assert_valid_callouts(batch); - grpc_error *error = GRPC_ERROR_NONE; + grpc_error* error = GRPC_ERROR_NONE; grpc_mdelem old_mdelem = storage->md; if (!grpc_slice_eq(GRPC_MDKEY(new_mdelem), GRPC_MDKEY(old_mdelem))) { maybe_unlink_callout(batch, storage); @@ -254,26 +254,26 @@ grpc_error *grpc_metadata_batch_substitute(grpc_metadata_batch *batch, return error; } -void grpc_metadata_batch_clear(grpc_metadata_batch *batch) { +void grpc_metadata_batch_clear(grpc_metadata_batch* batch) { grpc_metadata_batch_destroy(batch); grpc_metadata_batch_init(batch); } -bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { +bool grpc_metadata_batch_is_empty(grpc_metadata_batch* batch) { return batch->list.head == NULL && batch->deadline == GRPC_MILLIS_INF_FUTURE; } -size_t grpc_metadata_batch_size(grpc_metadata_batch *batch) { +size_t grpc_metadata_batch_size(grpc_metadata_batch* batch) { size_t size = 0; - for (grpc_linked_mdelem *elem = batch->list.head; elem != NULL; + for (grpc_linked_mdelem* elem = batch->list.head; elem != NULL; elem = elem->next) { size += GRPC_MDELEM_LENGTH(elem->md); } return size; } -static void add_error(grpc_error **composite, grpc_error *error, - const char *composite_error_string) { +static void add_error(grpc_error** composite, grpc_error* error, + const char* composite_error_string) { if (error == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) { *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(composite_error_string); @@ -281,14 +281,14 @@ static void add_error(grpc_error **composite, grpc_error *error, *composite = grpc_error_add_child(*composite, error); } -grpc_error *grpc_metadata_batch_filter(grpc_metadata_batch *batch, +grpc_error* grpc_metadata_batch_filter(grpc_metadata_batch* batch, grpc_metadata_batch_filter_func func, - void *user_data, - const char *composite_error_string) { - grpc_linked_mdelem *l = batch->list.head; - grpc_error *error = GRPC_ERROR_NONE; + void* user_data, + const char* composite_error_string) { + grpc_linked_mdelem* l = batch->list.head; + grpc_error* error = GRPC_ERROR_NONE; while (l) { - grpc_linked_mdelem *next = l->next; + grpc_linked_mdelem* next = l->next; grpc_filtered_mdelem new_mdelem = func(user_data, l->md); add_error(&error, new_mdelem.error, composite_error_string); if (GRPC_MDISNULL(new_mdelem.md)) { diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 0e6ef21091..e367f61cfd 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -34,16 +34,16 @@ extern "C" { typedef struct grpc_linked_mdelem { grpc_mdelem md; - struct grpc_linked_mdelem *next; - struct grpc_linked_mdelem *prev; - void *reserved; + struct grpc_linked_mdelem* next; + struct grpc_linked_mdelem* prev; + void* reserved; } grpc_linked_mdelem; typedef struct grpc_mdelem_list { size_t count; size_t default_count; // Number of default keys. - grpc_linked_mdelem *head; - grpc_linked_mdelem *tail; + grpc_linked_mdelem* head; + grpc_linked_mdelem* tail; } grpc_mdelem_list; typedef struct grpc_metadata_batch { @@ -56,24 +56,24 @@ typedef struct grpc_metadata_batch { grpc_millis deadline; } grpc_metadata_batch; -void grpc_metadata_batch_init(grpc_metadata_batch *batch); -void grpc_metadata_batch_destroy(grpc_metadata_batch *batch); -void grpc_metadata_batch_clear(grpc_metadata_batch *batch); -bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch); +void grpc_metadata_batch_init(grpc_metadata_batch* batch); +void grpc_metadata_batch_destroy(grpc_metadata_batch* batch); +void grpc_metadata_batch_clear(grpc_metadata_batch* batch); +bool grpc_metadata_batch_is_empty(grpc_metadata_batch* batch); /* Returns the transport size of the batch. */ -size_t grpc_metadata_batch_size(grpc_metadata_batch *batch); +size_t grpc_metadata_batch_size(grpc_metadata_batch* batch); /** Remove \a storage from the batch, unreffing the mdelem contained */ -void grpc_metadata_batch_remove(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage); +void grpc_metadata_batch_remove(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage); /** Substitute a new mdelem for an old value */ -grpc_error *grpc_metadata_batch_substitute(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage, +grpc_error* grpc_metadata_batch_substitute(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage, grpc_mdelem new_value); -void grpc_metadata_batch_set_value(grpc_linked_mdelem *storage, +void grpc_metadata_batch_set_value(grpc_linked_mdelem* storage, grpc_slice value); /** Add \a storage to the beginning of \a batch. storage->md is @@ -81,16 +81,16 @@ void grpc_metadata_batch_set_value(grpc_linked_mdelem *storage, \a storage is owned by the caller and must survive for the lifetime of batch. This usually means it should be around for the lifetime of the call. */ -grpc_error *grpc_metadata_batch_link_head(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) +grpc_error* grpc_metadata_batch_link_head(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) GRPC_MUST_USE_RESULT; /** Add \a storage to the end of \a batch. storage->md is assumed to be valid. \a storage is owned by the caller and must survive for the lifetime of batch. This usually means it should be around for the lifetime of the call. */ -grpc_error *grpc_metadata_batch_link_tail(grpc_metadata_batch *batch, - grpc_linked_mdelem *storage) +grpc_error* grpc_metadata_batch_link_tail(grpc_metadata_batch* batch, + grpc_linked_mdelem* storage) GRPC_MUST_USE_RESULT; /** Add \a elem_to_add as the first element in \a batch, using @@ -99,8 +99,8 @@ grpc_error *grpc_metadata_batch_link_tail(grpc_metadata_batch *batch, lifetime of batch. This usually means it should be around for the lifetime of the call. Takes ownership of \a elem_to_add */ -grpc_error *grpc_metadata_batch_add_head( - grpc_metadata_batch *batch, grpc_linked_mdelem *storage, +grpc_error* grpc_metadata_batch_add_head( + grpc_metadata_batch* batch, grpc_linked_mdelem* storage, grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT; /** Add \a elem_to_add as the last element in \a batch, using \a storage as backing storage for the linked list element. @@ -108,14 +108,14 @@ grpc_error *grpc_metadata_batch_add_head( lifetime of batch. This usually means it should be around for the lifetime of the call. Takes ownership of \a elem_to_add */ -grpc_error *grpc_metadata_batch_add_tail( - grpc_metadata_batch *batch, grpc_linked_mdelem *storage, +grpc_error* grpc_metadata_batch_add_tail( + grpc_metadata_batch* batch, grpc_linked_mdelem* storage, grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT; -grpc_error *grpc_attach_md_to_error(grpc_error *src, grpc_mdelem md); +grpc_error* grpc_attach_md_to_error(grpc_error* src, grpc_mdelem md); typedef struct { - grpc_error *error; + grpc_error* error; grpc_mdelem md; } grpc_filtered_mdelem; @@ -127,13 +127,13 @@ typedef struct { { GRPC_ERROR_NONE, GRPC_MDNULL } typedef grpc_filtered_mdelem (*grpc_metadata_batch_filter_func)( - void *user_data, grpc_mdelem elem); -grpc_error *grpc_metadata_batch_filter( - grpc_metadata_batch *batch, grpc_metadata_batch_filter_func func, - void *user_data, const char *composite_error_string) GRPC_MUST_USE_RESULT; + void* user_data, grpc_mdelem elem); +grpc_error* grpc_metadata_batch_filter( + grpc_metadata_batch* batch, grpc_metadata_batch_filter_func func, + void* user_data, const char* composite_error_string) GRPC_MUST_USE_RESULT; #ifndef NDEBUG -void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); +void grpc_metadata_batch_assert_ok(grpc_metadata_batch* comd); #else #define grpc_metadata_batch_assert_ok(comd) \ do { \ diff --git a/src/core/lib/transport/pid_controller.cc b/src/core/lib/transport/pid_controller.cc index 4b304f17b2..e31cc85f76 100644 --- a/src/core/lib/transport/pid_controller.cc +++ b/src/core/lib/transport/pid_controller.cc @@ -19,45 +19,30 @@ #include "src/core/lib/transport/pid_controller.h" #include <grpc/support/useful.h> -void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - grpc_pid_controller_args args) { - pid_controller->args = args; - pid_controller->last_control_value = args.initial_control_value; - grpc_pid_controller_reset(pid_controller); -} +namespace grpc_core { -void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) { - pid_controller->last_error = 0.0; - pid_controller->last_dc_dt = 0.0; - pid_controller->error_integral = 0.0; -} +PidController::PidController(const Args& args) + : last_control_value_(args.initial_control_value()), args_(args) {} -double grpc_pid_controller_update(grpc_pid_controller *pid_controller, - double error, double dt) { - if (dt == 0) return pid_controller->last_control_value; +double PidController::Update(double error, double dt) { + if (dt <= 0) return last_control_value_; /* integrate error using the trapezoid rule */ - pid_controller->error_integral += - dt * (pid_controller->last_error + error) * 0.5; - pid_controller->error_integral = GPR_CLAMP( - pid_controller->error_integral, -pid_controller->args.integral_range, - pid_controller->args.integral_range); - double diff_error = (error - pid_controller->last_error) / dt; + error_integral_ += dt * (last_error_ + error) * 0.5; + error_integral_ = GPR_CLAMP(error_integral_, -args_.integral_range(), + args_.integral_range()); + double diff_error = (error - last_error_) / dt; /* calculate derivative of control value vs time */ - double dc_dt = pid_controller->args.gain_p * error + - pid_controller->args.gain_i * pid_controller->error_integral + - pid_controller->args.gain_d * diff_error; + double dc_dt = args_.gain_p() * error + args_.gain_i() * error_integral_ + + args_.gain_d() * diff_error; /* and perform trapezoidal integration */ - double new_control_value = pid_controller->last_control_value + - dt * (pid_controller->last_dc_dt + dc_dt) * 0.5; - new_control_value = - GPR_CLAMP(new_control_value, pid_controller->args.min_control_value, - pid_controller->args.max_control_value); - pid_controller->last_error = error; - pid_controller->last_dc_dt = dc_dt; - pid_controller->last_control_value = new_control_value; + double new_control_value = + last_control_value_ + dt * (last_dc_dt_ + dc_dt) * 0.5; + new_control_value = GPR_CLAMP(new_control_value, args_.min_control_value(), + args_.max_control_value()); + last_error_ = error; + last_dc_dt_ = dc_dt; + last_control_value_ = new_control_value; return new_control_value; } -double grpc_pid_controller_last(grpc_pid_controller *pid_controller) { - return pid_controller->last_control_value; -} +} // namespace grpc_core diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h index 80899e9a20..87e59a1a90 100644 --- a/src/core/lib/transport/pid_controller.h +++ b/src/core/lib/transport/pid_controller.h @@ -19,9 +19,7 @@ #ifndef GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H #define GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H -#ifdef __cplusplus -extern "C" { -#endif +#include <limits> /* \file Simple PID controller. Implements a proportional-integral-derivative controller. @@ -30,41 +28,87 @@ extern "C" { Gains can be set to adjust sensitivity to current error (p), the integral of error (i), and the derivative of error (d). */ -typedef struct { - double gain_p; - double gain_i; - double gain_d; - double initial_control_value; - double min_control_value; - double max_control_value; - double integral_range; -} grpc_pid_controller_args; +namespace grpc_core { -typedef struct { - double last_error; - double error_integral; - double last_control_value; - double last_dc_dt; - grpc_pid_controller_args args; -} grpc_pid_controller; +class PidController { + public: + class Args { + public: + double gain_p() const { return gain_p_; } + double gain_i() const { return gain_i_; } + double gain_d() const { return gain_d_; } + double initial_control_value() const { return initial_control_value_; } + double min_control_value() const { return min_control_value_; } + double max_control_value() const { return max_control_value_; } + double integral_range() const { return integral_range_; } -/** Initialize the controller */ -void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - grpc_pid_controller_args args); + Args& set_gain_p(double gain_p) { + gain_p_ = gain_p; + return *this; + } + Args& set_gain_i(double gain_i) { + gain_i_ = gain_i; + return *this; + } + Args& set_gain_d(double gain_d) { + gain_d_ = gain_d; + return *this; + } + Args& set_initial_control_value(double initial_control_value) { + initial_control_value_ = initial_control_value; + return *this; + } + Args& set_min_control_value(double min_control_value) { + min_control_value_ = min_control_value; + return *this; + } + Args& set_max_control_value(double max_control_value) { + max_control_value_ = max_control_value; + return *this; + } + Args& set_integral_range(double integral_range) { + integral_range_ = integral_range; + return *this; + } -/** Reset the controller: useful when things have changed significantly */ -void grpc_pid_controller_reset(grpc_pid_controller *pid_controller); + private: + double gain_p_ = 0.0; + double gain_i_ = 0.0; + double gain_d_ = 0.0; + double initial_control_value_ = 0.0; + double min_control_value_ = std::numeric_limits<double>::min(); + double max_control_value_ = std::numeric_limits<double>::max(); + double integral_range_ = std::numeric_limits<double>::max(); + }; -/** Update the controller: given a current error estimate, and the time since - the last update, returns a new control value */ -double grpc_pid_controller_update(grpc_pid_controller *pid_controller, - double error, double dt); + explicit PidController(const Args& args); -/** Returns the last control value calculated */ -double grpc_pid_controller_last(grpc_pid_controller *pid_controller); + /// Reset the controller internal state: useful when the environment has + /// changed significantly + void Reset() { + last_error_ = 0.0; + last_dc_dt_ = 0.0; + error_integral_ = 0.0; + } -#ifdef __cplusplus -} -#endif + /// Update the controller: given a current error estimate, and the time since + /// the last update, returns a new control value + double Update(double error, double dt); + + /// Returns the last control value calculated + double last_control_value() const { return last_control_value_; } + + /// Returns the current error integral (mostly for testing) + double error_integral() const { return error_integral_; } + + private: + double last_error_ = 0.0; + double error_integral_ = 0.0; + double last_control_value_; + double last_dc_dt_ = 0.0; + const Args args_; +}; + +} // namespace grpc_core #endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */ diff --git a/src/core/lib/transport/service_config.cc b/src/core/lib/transport/service_config.cc index 7aa4873d37..60b70a5f91 100644 --- a/src/core/lib/transport/service_config.cc +++ b/src/core/lib/transport/service_config.cc @@ -111,7 +111,13 @@ const char* grpc_service_config_get_lb_policy_name( static size_t count_names_in_method_config_json(grpc_json* json) { size_t num_names = 0; for (grpc_json* field = json->child; field != NULL; field = field->next) { - if (field->key != NULL && strcmp(field->key, "name") == 0) ++num_names; + if (field->key != NULL && strcmp(field->key, "name") == 0) { + if (field->type != GRPC_JSON_ARRAY) return -1; + for (grpc_json* name = field->child; name != NULL; name = name->next) { + if (name->type != GRPC_JSON_OBJECT) return -1; + ++num_names; + } + } } return num_names; } @@ -147,6 +153,7 @@ static char* parse_json_method_name(grpc_json* json) { // Returns false on error. static bool parse_json_method_config( grpc_json* json, void* (*create_value)(const grpc_json* method_config_json), + void* (*ref_value)(void* value), void (*unref_value)(void* value), grpc_slice_hash_table_entry* entries, size_t* idx) { // Construct value. void* method_config = create_value(json); @@ -161,6 +168,7 @@ static bool parse_json_method_config( if (child->type != GRPC_JSON_ARRAY) goto done; for (grpc_json* name = child->child; name != NULL; name = name->next) { char* path = parse_json_method_name(name); + if (path == NULL) goto done; gpr_strvec_add(&paths, path); } } @@ -169,11 +177,12 @@ static bool parse_json_method_config( // Add entry for each path. for (size_t i = 0; i < paths.count; ++i) { entries[*idx].key = grpc_slice_from_copied_string(paths.strs[i]); - entries[*idx].value = method_config; + entries[*idx].value = ref_value(method_config); ++*idx; } success = true; done: + unref_value(method_config); gpr_strvec_destroy(&paths); return success; } @@ -181,7 +190,7 @@ done: grpc_slice_hash_table* grpc_service_config_create_method_config_table( const grpc_service_config* service_config, void* (*create_value)(const grpc_json* method_config_json), - void (*destroy_value)(void* value)) { + void* (*ref_value)(void* value), void (*unref_value)(void* value)) { const grpc_json* json = service_config->json_tree; // Traverse parsed JSON tree. if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL; @@ -195,7 +204,9 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( // Find number of entries. for (grpc_json* method = field->child; method != NULL; method = method->next) { - num_entries += count_names_in_method_config_json(method); + size_t count = count_names_in_method_config_json(method); + if (count <= 0) return NULL; + num_entries += count; } // Populate method config table entries. entries = (grpc_slice_hash_table_entry*)gpr_malloc( @@ -203,7 +214,13 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( size_t idx = 0; for (grpc_json* method = field->child; method != NULL; method = method->next) { - if (!parse_json_method_config(method, create_value, entries, &idx)) { + if (!parse_json_method_config(method, create_value, ref_value, + unref_value, entries, &idx)) { + for (size_t i = 0; i < idx; ++i) { + grpc_slice_unref_internal(entries[i].key); + unref_value(entries[i].value); + } + gpr_free(entries); return NULL; } } @@ -214,7 +231,7 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( grpc_slice_hash_table* method_config_table = NULL; if (entries != NULL) { method_config_table = - grpc_slice_hash_table_create(num_entries, entries, destroy_value, NULL); + grpc_slice_hash_table_create(num_entries, entries, unref_value, NULL); gpr_free(entries); } return method_config_table; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index 6395a368f2..2b70e884e8 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -46,12 +46,12 @@ const char* grpc_service_config_get_lb_policy_name( /// Creates a method config table based on the data in \a json. /// The table's keys are request paths. The table's value type is /// returned by \a create_value(), based on data parsed from the JSON tree. -/// \a destroy_value is used to clean up values. +/// \a ref_value() and \a unref_value() are used to ref and unref values. /// Returns NULL on error. grpc_slice_hash_table* grpc_service_config_create_method_config_table( const grpc_service_config* service_config, void* (*create_value)(const grpc_json* method_config_json), - void (*destroy_value)(void* value)); + void* (*ref_value)(void* value), void (*unref_value)(void* value)); /// A helper function for looking up values in the table returned by /// \a grpc_service_config_create_method_config_table(). diff --git a/src/core/lib/transport/static_metadata.cc b/src/core/lib/transport/static_metadata.cc index e6c8d290f2..2213b30f56 100644 --- a/src/core/lib/transport/static_metadata.cc +++ b/src/core/lib/transport/static_metadata.cc @@ -103,8 +103,8 @@ static uint8_t g_bytes[] = { 105, 100, 101, 110, 116, 105, 116, 121, 44, 100, 101, 102, 108, 97, 116, 101, 44, 103, 122, 105, 112}; -static void static_ref(void *unused) {} -static void static_unref(void *unused) {} +static void static_ref(void* unused) {} +static void static_unref(void* unused) {} static const grpc_slice_refcount_vtable static_sub_vtable = { static_ref, static_unref, grpc_slice_default_eq_impl, grpc_slice_default_hash_impl}; diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h index 299410f22c..8e73d5f278 100644 --- a/src/core/lib/transport/static_metadata.h +++ b/src/core/lib/transport/static_metadata.h @@ -541,30 +541,30 @@ typedef enum { } grpc_metadata_batch_callouts_index; typedef union { - struct grpc_linked_mdelem *array[GRPC_BATCH_CALLOUTS_COUNT]; + struct grpc_linked_mdelem* array[GRPC_BATCH_CALLOUTS_COUNT]; struct { - struct grpc_linked_mdelem *path; - struct grpc_linked_mdelem *method; - struct grpc_linked_mdelem *status; - struct grpc_linked_mdelem *authority; - struct grpc_linked_mdelem *scheme; - struct grpc_linked_mdelem *te; - struct grpc_linked_mdelem *grpc_message; - struct grpc_linked_mdelem *grpc_status; - struct grpc_linked_mdelem *grpc_payload_bin; - struct grpc_linked_mdelem *grpc_encoding; - struct grpc_linked_mdelem *grpc_accept_encoding; - struct grpc_linked_mdelem *grpc_server_stats_bin; - struct grpc_linked_mdelem *grpc_tags_bin; - struct grpc_linked_mdelem *grpc_trace_bin; - struct grpc_linked_mdelem *content_type; - struct grpc_linked_mdelem *content_encoding; - struct grpc_linked_mdelem *accept_encoding; - struct grpc_linked_mdelem *grpc_internal_encoding_request; - struct grpc_linked_mdelem *grpc_internal_stream_encoding_request; - struct grpc_linked_mdelem *user_agent; - struct grpc_linked_mdelem *host; - struct grpc_linked_mdelem *lb_token; + struct grpc_linked_mdelem* path; + struct grpc_linked_mdelem* method; + struct grpc_linked_mdelem* status; + struct grpc_linked_mdelem* authority; + struct grpc_linked_mdelem* scheme; + struct grpc_linked_mdelem* te; + struct grpc_linked_mdelem* grpc_message; + struct grpc_linked_mdelem* grpc_status; + struct grpc_linked_mdelem* grpc_payload_bin; + struct grpc_linked_mdelem* grpc_encoding; + struct grpc_linked_mdelem* grpc_accept_encoding; + struct grpc_linked_mdelem* grpc_server_stats_bin; + struct grpc_linked_mdelem* grpc_tags_bin; + struct grpc_linked_mdelem* grpc_trace_bin; + struct grpc_linked_mdelem* content_type; + struct grpc_linked_mdelem* content_encoding; + struct grpc_linked_mdelem* accept_encoding; + struct grpc_linked_mdelem* grpc_internal_encoding_request; + struct grpc_linked_mdelem* grpc_internal_stream_encoding_request; + struct grpc_linked_mdelem* user_agent; + struct grpc_linked_mdelem* host; + struct grpc_linked_mdelem* lb_token; } named; } grpc_metadata_batch_callouts; diff --git a/src/core/lib/transport/timeout_encoding.cc b/src/core/lib/transport/timeout_encoding.cc index 23a9ef308f..86db6c8344 100644 --- a/src/core/lib/transport/timeout_encoding.cc +++ b/src/core/lib/transport/timeout_encoding.cc @@ -41,15 +41,15 @@ static int64_t round_up_to_three_sig_figs(int64_t x) { } /* encode our minimum viable timeout value */ -static void enc_tiny(char *buffer) { memcpy(buffer, "1n", 3); } +static void enc_tiny(char* buffer) { memcpy(buffer, "1n", 3); } -static void enc_ext(char *buffer, int64_t value, char ext) { +static void enc_ext(char* buffer, int64_t value, char ext) { int n = int64_ttoa(value, buffer); buffer[n] = ext; buffer[n + 1] = 0; } -static void enc_seconds(char *buffer, int64_t sec) { +static void enc_seconds(char* buffer, int64_t sec) { if (sec % 3600 == 0) { enc_ext(buffer, sec / 3600, 'H'); } else if (sec % 60 == 0) { @@ -59,7 +59,7 @@ static void enc_seconds(char *buffer, int64_t sec) { } } -static void enc_millis(char *buffer, int64_t x) { +static void enc_millis(char* buffer, int64_t x) { x = round_up_to_three_sig_figs(x); if (x < GPR_MS_PER_SEC) { enc_ext(buffer, x, 'm'); @@ -72,7 +72,7 @@ static void enc_millis(char *buffer, int64_t x) { } } -void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer) { +void grpc_http2_encode_timeout(grpc_millis timeout, char* buffer) { if (timeout <= 0) { enc_tiny(buffer); } else if (timeout < 1000 * GPR_MS_PER_SEC) { @@ -83,15 +83,15 @@ void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer) { } } -static int is_all_whitespace(const char *p, const char *end) { +static int is_all_whitespace(const char* p, const char* end) { while (p != end && *p == ' ') p++; return p == end; } -int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout) { +int grpc_http2_decode_timeout(grpc_slice text, grpc_millis* timeout) { grpc_millis x = 0; - const uint8_t *p = GRPC_SLICE_START_PTR(text); - const uint8_t *end = GRPC_SLICE_END_PTR(text); + const uint8_t* p = GRPC_SLICE_START_PTR(text); + const uint8_t* end = GRPC_SLICE_END_PTR(text); int have_digit = 0; /* skip whitespace */ for (; p != end && *p == ' '; p++) @@ -138,5 +138,5 @@ int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout) { return 0; } p++; - return is_all_whitespace((const char *)p, (const char *)end); + return is_all_whitespace((const char*)p, (const char*)end); } diff --git a/src/core/lib/transport/timeout_encoding.h b/src/core/lib/transport/timeout_encoding.h index 91cdf0f728..9c3c4599c9 100644 --- a/src/core/lib/transport/timeout_encoding.h +++ b/src/core/lib/transport/timeout_encoding.h @@ -33,8 +33,8 @@ extern "C" { /* Encode/decode timeouts to the GRPC over HTTP/2 format; encoding may round up arbitrarily */ -void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer); -int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout); +void grpc_http2_encode_timeout(grpc_millis timeout, char* buffer); +int grpc_http2_decode_timeout(grpc_slice text, grpc_millis* timeout); #ifdef __cplusplus } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 2afc7ba7b5..6f31bd07f9 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -37,7 +37,7 @@ grpc_tracer_flag grpc_trace_stream_refcount = #endif #ifndef NDEBUG -void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { +void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason) { if (GRPC_TRACER_ON(grpc_trace_stream_refcount)) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_log(GPR_DEBUG, "%s %p:%p REF %" PRIdPTR "->%" PRIdPTR " %s", @@ -45,13 +45,13 @@ void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { val + 1, reason); } #else -void grpc_stream_ref(grpc_stream_refcount *refcount) { +void grpc_stream_ref(grpc_stream_refcount* refcount) { #endif gpr_ref_non_zero(&refcount->refs); } #ifndef NDEBUG -void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason) { +void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) { if (GRPC_TRACER_ON(grpc_trace_stream_refcount)) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s", @@ -59,7 +59,7 @@ void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason) { val - 1, reason); } #else -void grpc_stream_unref(grpc_stream_refcount *refcount) { +void grpc_stream_unref(grpc_stream_refcount* refcount) { #endif if (gpr_unref(&refcount->refs)) { if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { @@ -77,11 +77,11 @@ void grpc_stream_unref(grpc_stream_refcount *refcount) { } } -#define STREAM_REF_FROM_SLICE_REF(p) \ - ((grpc_stream_refcount *)(((uint8_t *)p) - \ - offsetof(grpc_stream_refcount, slice_refcount))) +#define STREAM_REF_FROM_SLICE_REF(p) \ + ((grpc_stream_refcount*)(((uint8_t*)p) - \ + offsetof(grpc_stream_refcount, slice_refcount))) -static void slice_stream_ref(void *p) { +static void slice_stream_ref(void* p) { #ifndef NDEBUG grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice"); #else @@ -89,7 +89,7 @@ static void slice_stream_ref(void *p) { #endif } -static void slice_stream_unref(void *p) { +static void slice_stream_unref(void* p) { #ifndef NDEBUG grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p), "slice"); #else @@ -97,12 +97,12 @@ static void slice_stream_unref(void *p) { #endif } -grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, - void *buffer, size_t length) { +grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount, + void* buffer, size_t length) { slice_stream_ref(&refcount->slice_refcount); grpc_slice res; res.refcount = &refcount->slice_refcount, - res.data.refcounted.bytes = (uint8_t *)buffer; + res.data.refcounted.bytes = (uint8_t*)buffer; res.data.refcounted.length = length; return res; } @@ -115,13 +115,13 @@ static const grpc_slice_refcount_vtable stream_ref_slice_vtable = { }; #ifndef NDEBUG -void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, - grpc_iomgr_cb_func cb, void *cb_arg, - const char *object_type) { +void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, + grpc_iomgr_cb_func cb, void* cb_arg, + const char* object_type) { refcount->object_type = object_type; #else -void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, - grpc_iomgr_cb_func cb, void *cb_arg) { +void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, + grpc_iomgr_cb_func cb, void* cb_arg) { #endif gpr_ref_init(&refcount->refs, initial_refs); GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx); @@ -129,54 +129,54 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, refcount->slice_refcount.sub_refcount = &refcount->slice_refcount; } -static void move64(uint64_t *from, uint64_t *to) { +static void move64(uint64_t* from, uint64_t* to) { *to += *from; *from = 0; } -void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, - grpc_transport_one_way_stats *to) { +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from, + grpc_transport_one_way_stats* to) { move64(&from->framing_bytes, &to->framing_bytes); move64(&from->data_bytes, &to->data_bytes); move64(&from->header_bytes, &to->header_bytes); } -void grpc_transport_move_stats(grpc_transport_stream_stats *from, - grpc_transport_stream_stats *to) { +void grpc_transport_move_stats(grpc_transport_stream_stats* from, + grpc_transport_stream_stats* to) { grpc_transport_move_one_way_stats(&from->incoming, &to->incoming); grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing); } -size_t grpc_transport_stream_size(grpc_transport *transport) { +size_t grpc_transport_stream_size(grpc_transport* transport) { return transport->vtable->sizeof_stream; } -void grpc_transport_destroy(grpc_transport *transport) { +void grpc_transport_destroy(grpc_transport* transport) { transport->vtable->destroy(transport); } -int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - grpc_stream_refcount *refcount, - const void *server_data, gpr_arena *arena) { +int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream, + grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) { return transport->vtable->init_stream(transport, stream, refcount, server_data, arena); } -void grpc_transport_perform_stream_op(grpc_transport *transport, - grpc_stream *stream, - grpc_transport_stream_op_batch *op) { +void grpc_transport_perform_stream_op(grpc_transport* transport, + grpc_stream* stream, + grpc_transport_stream_op_batch* op) { transport->vtable->perform_stream_op(transport, stream, op); } -void grpc_transport_perform_op(grpc_transport *transport, - grpc_transport_op *op) { +void grpc_transport_perform_op(grpc_transport* transport, + grpc_transport_op* op) { transport->vtable->perform_op(transport, op); } -void grpc_transport_set_pops(grpc_transport *transport, grpc_stream *stream, - grpc_polling_entity *pollent) { - grpc_pollset *pollset; - grpc_pollset_set *pollset_set; +void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, + grpc_polling_entity* pollent) { + grpc_pollset* pollset; + grpc_pollset_set* pollset_set; if ((pollset = grpc_polling_entity_pollset(pollent)) != NULL) { transport->vtable->set_pollset(transport, stream, pollset); } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) != NULL) { @@ -186,13 +186,13 @@ void grpc_transport_set_pops(grpc_transport *transport, grpc_stream *stream, } } -void grpc_transport_destroy_stream(grpc_transport *transport, - grpc_stream *stream, - grpc_closure *then_schedule_closure) { +void grpc_transport_destroy_stream(grpc_transport* transport, + grpc_stream* stream, + grpc_closure* then_schedule_closure) { transport->vtable->destroy_stream(transport, stream, then_schedule_closure); } -grpc_endpoint *grpc_transport_get_endpoint(grpc_transport *transport) { +grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) { return transport->vtable->get_endpoint(transport); } @@ -204,8 +204,8 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_transport *transport) { // though it lives in lib, it handles transport stream ops sure // it's grpc_transport_stream_op_batch_finish_with_failure void grpc_transport_stream_op_batch_finish_with_failure( - grpc_transport_stream_op_batch *batch, grpc_error *error, - grpc_call_combiner *call_combiner) { + grpc_transport_stream_op_batch* batch, grpc_error* error, + grpc_call_combiner* call_combiner) { if (batch->send_message) { grpc_byte_stream_destroy(batch->payload->send_message.send_message); } @@ -228,18 +228,18 @@ void grpc_transport_stream_op_batch_finish_with_failure( typedef struct { grpc_closure outer_on_complete; - grpc_closure *inner_on_complete; + grpc_closure* inner_on_complete; grpc_transport_op op; } made_transport_op; -static void destroy_made_transport_op(void *arg, grpc_error *error) { - made_transport_op *op = (made_transport_op *)arg; +static void destroy_made_transport_op(void* arg, grpc_error* error) { + made_transport_op* op = (made_transport_op*)arg; GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error)); gpr_free(op); } -grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { - made_transport_op *op = (made_transport_op *)gpr_malloc(sizeof(*op)); +grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) { + made_transport_op* op = (made_transport_op*)gpr_malloc(sizeof(*op)); GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op, grpc_schedule_on_exec_ctx); op->inner_on_complete = on_complete; @@ -250,22 +250,22 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { typedef struct { grpc_closure outer_on_complete; - grpc_closure *inner_on_complete; + grpc_closure* inner_on_complete; grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload payload; } made_transport_stream_op; -static void destroy_made_transport_stream_op(void *arg, grpc_error *error) { - made_transport_stream_op *op = (made_transport_stream_op *)arg; - grpc_closure *c = op->inner_on_complete; +static void destroy_made_transport_stream_op(void* arg, grpc_error* error) { + made_transport_stream_op* op = (made_transport_stream_op*)arg; + grpc_closure* c = op->inner_on_complete; gpr_free(op); GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error)); } -grpc_transport_stream_op_batch *grpc_make_transport_stream_op( - grpc_closure *on_complete) { - made_transport_stream_op *op = - (made_transport_stream_op *)gpr_zalloc(sizeof(*op)); +grpc_transport_stream_op_batch* grpc_make_transport_stream_op( + grpc_closure* on_complete) { + made_transport_stream_op* op = + (made_transport_stream_op*)gpr_zalloc(sizeof(*op)); op->op.payload = &op->payload; GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op, op, grpc_schedule_on_exec_ctx); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index db61f710f7..a6692ba1c2 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -51,32 +51,32 @@ typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; #ifndef NDEBUG - const char *object_type; + const char* object_type; #endif grpc_slice_refcount slice_refcount; } grpc_stream_refcount; #ifndef NDEBUG -void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, - grpc_iomgr_cb_func cb, void *cb_arg, - const char *object_type); -void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason); -void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason); +void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, + grpc_iomgr_cb_func cb, void* cb_arg, + const char* object_type); +void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason); +void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason); #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype) #else -void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, - grpc_iomgr_cb_func cb, void *cb_arg); -void grpc_stream_ref(grpc_stream_refcount *refcount); -void grpc_stream_unref(grpc_stream_refcount *refcount); +void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, + grpc_iomgr_cb_func cb, void* cb_arg); +void grpc_stream_ref(grpc_stream_refcount* refcount); +void grpc_stream_unref(grpc_stream_refcount* refcount); #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif /* Wrap a buffer that is owned by some stream object into a slice that shares the same refcount */ -grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, - void *buffer, size_t length); +grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount, + void* buffer, size_t length); typedef struct { uint64_t framing_bytes; @@ -89,14 +89,14 @@ typedef struct grpc_transport_stream_stats { grpc_transport_one_way_stats outgoing; } grpc_transport_stream_stats; -void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, - grpc_transport_one_way_stats *to); +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from, + grpc_transport_one_way_stats* to); -void grpc_transport_move_stats(grpc_transport_stream_stats *from, - grpc_transport_stream_stats *to); +void grpc_transport_move_stats(grpc_transport_stream_stats* from, + grpc_transport_stream_stats* to); typedef struct { - void *extra_arg; + void* extra_arg; grpc_closure closure; } grpc_handler_private_op_data; @@ -109,10 +109,10 @@ typedef struct grpc_transport_stream_op_batch { /** Should be enqueued when all requested operations (excluding recv_message and recv_initial_metadata which have their own closures) in a given batch have been completed. */ - grpc_closure *on_complete; + grpc_closure* on_complete; /** Values for the stream op (fields set are determined by flags above) */ - grpc_transport_stream_op_batch_payload *payload; + grpc_transport_stream_op_batch_payload* payload; /** Send initial metadata to the peer, from the provided metadata batch. */ bool send_initial_metadata : 1; @@ -148,17 +148,17 @@ typedef struct grpc_transport_stream_op_batch { struct grpc_transport_stream_op_batch_payload { struct { - grpc_metadata_batch *send_initial_metadata; + grpc_metadata_batch* send_initial_metadata; /** Iff send_initial_metadata != NULL, flags associated with send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ uint32_t send_initial_metadata_flags; // If non-NULL, will be set by the transport to the peer string // (a char*, which the caller takes ownership of). - gpr_atm *peer_string; + gpr_atm* peer_string; } send_initial_metadata; struct { - grpc_metadata_batch *send_trailing_metadata; + grpc_metadata_batch* send_trailing_metadata; } send_trailing_metadata; struct { @@ -167,21 +167,21 @@ struct grpc_transport_stream_op_batch_payload { // grpc_byte_stream_destroy() on this. // The batch's on_complete will not be called until after the byte // stream is destroyed. - grpc_byte_stream *send_message; + grpc_byte_stream* send_message; } send_message; struct { - grpc_metadata_batch *recv_initial_metadata; - uint32_t *recv_flags; + grpc_metadata_batch* recv_initial_metadata; + uint32_t* recv_flags; /** Should be enqueued when initial metadata is ready to be processed. */ - grpc_closure *recv_initial_metadata_ready; + grpc_closure* recv_initial_metadata_ready; // If not NULL, will be set to true if trailing metadata is // immediately available. This may be a signal that we received a // Trailers-Only response. - bool *trailing_metadata_available; + bool* trailing_metadata_available; // If non-NULL, will be set by the transport to the peer string // (a char*, which the caller takes ownership of). - gpr_atm *peer_string; + gpr_atm* peer_string; } recv_initial_metadata; struct { @@ -189,17 +189,17 @@ struct grpc_transport_stream_op_batch_payload { // containing a received message. // The caller is responsible for calling grpc_byte_stream_destroy() // on this byte stream. - grpc_byte_stream **recv_message; + grpc_byte_stream** recv_message; /** Should be enqueued when one message is ready to be processed. */ - grpc_closure *recv_message_ready; + grpc_closure* recv_message_ready; } recv_message; struct { - grpc_metadata_batch *recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata; } recv_trailing_metadata; struct { - grpc_transport_stream_stats *collect_stats; + grpc_transport_stream_stats* collect_stats; } collect_stats; /** Forcefully close this stream. @@ -215,42 +215,42 @@ struct grpc_transport_stream_op_batch_payload { struct { // Error contract: the transport that gets this op must cause cancel_error // to be unref'ed after processing it - grpc_error *cancel_error; + grpc_error* cancel_error; } cancel_stream; /* Indexes correspond to grpc_context_index enum values */ - grpc_call_context_element *context; + grpc_call_context_element* context; }; /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { /** Called when processing of this op is done. */ - grpc_closure *on_consumed; + grpc_closure* on_consumed; /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ - grpc_closure *on_connectivity_state_change; - grpc_connectivity_state *connectivity_state; + grpc_closure* on_connectivity_state_change; + grpc_connectivity_state* connectivity_state; /** should the transport be disconnected * Error contract: the transport that gets this op must cause * disconnect_with_error to be unref'ed after processing it */ - grpc_error *disconnect_with_error; + grpc_error* disconnect_with_error; /** what should the goaway contain? * Error contract: the transport that gets this op must cause * goaway_error to be unref'ed after processing it */ - grpc_error *goaway_error; + grpc_error* goaway_error; /** set the callback for accepting new streams; this is a permanent callback, unlike the other one-shot closures. If true, the callback is set to set_accept_stream_fn, with its user_data argument set to set_accept_stream_user_data */ bool set_accept_stream; - void (*set_accept_stream_fn)(void *user_data, grpc_transport *transport, - const void *server_data); - void *set_accept_stream_user_data; + void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport, + const void* server_data); + void* set_accept_stream_user_data; /** add this transport to a pollset */ - grpc_pollset *bind_pollset; + grpc_pollset* bind_pollset; /** add this transport to a pollset_set */ - grpc_pollset_set *bind_pollset_set; + grpc_pollset_set* bind_pollset_set; /** send a ping, call this back if not NULL */ - grpc_closure *send_ping; + grpc_closure* send_ping; /*************************************************************************** * remaining fields are initialized and used at the discretion of the @@ -261,7 +261,7 @@ typedef struct grpc_transport_op { /* Returns the amount of memory required to store a grpc_stream for this transport */ -size_t grpc_transport_stream_size(grpc_transport *transport); +size_t grpc_transport_stream_size(grpc_transport* transport); /* Initialize transport data for a stream. @@ -273,12 +273,12 @@ size_t grpc_transport_stream_size(grpc_transport *transport); stream - a pointer to uninitialized memory to initialize server_data - either NULL for a client initiated stream, or a pointer supplied from the accept_stream callback function */ -int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - grpc_stream_refcount *refcount, - const void *server_data, gpr_arena *arena); +int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream, + grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena); -void grpc_transport_set_pops(grpc_transport *transport, grpc_stream *stream, - grpc_polling_entity *pollent); +void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, + grpc_polling_entity* pollent); /* Destroy transport data for a stream. @@ -290,16 +290,16 @@ void grpc_transport_set_pops(grpc_transport *transport, grpc_stream *stream, transport - the transport on which to create this stream stream - the grpc_stream to destroy (memory is still owned by the caller, but any child memory must be cleaned up) */ -void grpc_transport_destroy_stream(grpc_transport *transport, - grpc_stream *stream, - grpc_closure *then_schedule_closure); +void grpc_transport_destroy_stream(grpc_transport* transport, + grpc_stream* stream, + grpc_closure* then_schedule_closure); void grpc_transport_stream_op_batch_finish_with_failure( - grpc_transport_stream_op_batch *op, grpc_error *error, - grpc_call_combiner *call_combiner); + grpc_transport_stream_op_batch* op, grpc_error* error, + grpc_call_combiner* call_combiner); -char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op); -char *grpc_transport_op_string(grpc_transport_op *op); +char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op); +char* grpc_transport_op_string(grpc_transport_op* op); /* Send a batch of operations on a transport @@ -311,39 +311,39 @@ char *grpc_transport_op_string(grpc_transport_op *op); non-NULL and previously initialized by the same transport. op - a grpc_transport_stream_op_batch specifying the op to perform */ -void grpc_transport_perform_stream_op(grpc_transport *transport, - grpc_stream *stream, - grpc_transport_stream_op_batch *op); +void grpc_transport_perform_stream_op(grpc_transport* transport, + grpc_stream* stream, + grpc_transport_stream_op_batch* op); -void grpc_transport_perform_op(grpc_transport *transport, - grpc_transport_op *op); +void grpc_transport_perform_op(grpc_transport* transport, + grpc_transport_op* op); /* Send a ping on a transport Calls cb with user data when a response is received. */ -void grpc_transport_ping(grpc_transport *transport, grpc_closure *cb); +void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb); /* Advise peer of pending connection termination. */ -void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, +void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status, grpc_slice debug_data); /* Close a transport. Aborts all open streams. */ -void grpc_transport_close(grpc_transport *transport); +void grpc_transport_close(grpc_transport* transport); /* Destroy the transport */ -void grpc_transport_destroy(grpc_transport *transport); +void grpc_transport_destroy(grpc_transport* transport); /* Get the endpoint used by \a transport */ -grpc_endpoint *grpc_transport_get_endpoint(grpc_transport *transport); +grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport); /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to \a on_consumed and then delete the returned transport op */ -grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); +grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed); /* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed closure to \a on_consumed and then delete the returned transport op */ -grpc_transport_stream_op_batch *grpc_make_transport_stream_op( - grpc_closure *on_consumed); +grpc_transport_stream_op_batch* grpc_make_transport_stream_op( + grpc_closure* on_consumed); #ifdef __cplusplus } diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index c7b78afa61..8edcb5d310 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -31,43 +31,43 @@ typedef struct grpc_transport_vtable { size_t sizeof_stream; /* = sizeof(transport stream) */ /* name of this transport implementation */ - const char *name; + const char* name; /* implementation of grpc_transport_init_stream */ - int (*init_stream)(grpc_transport *self, grpc_stream *stream, - grpc_stream_refcount *refcount, const void *server_data, - gpr_arena *arena); + int (*init_stream)(grpc_transport* self, grpc_stream* stream, + grpc_stream_refcount* refcount, const void* server_data, + gpr_arena* arena); /* implementation of grpc_transport_set_pollset */ - void (*set_pollset)(grpc_transport *self, grpc_stream *stream, - grpc_pollset *pollset); + void (*set_pollset)(grpc_transport* self, grpc_stream* stream, + grpc_pollset* pollset); /* implementation of grpc_transport_set_pollset */ - void (*set_pollset_set)(grpc_transport *self, grpc_stream *stream, - grpc_pollset_set *pollset_set); + void (*set_pollset_set)(grpc_transport* self, grpc_stream* stream, + grpc_pollset_set* pollset_set); /* implementation of grpc_transport_perform_stream_op */ - void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream, - grpc_transport_stream_op_batch *op); + void (*perform_stream_op)(grpc_transport* self, grpc_stream* stream, + grpc_transport_stream_op_batch* op); /* implementation of grpc_transport_perform_op */ - void (*perform_op)(grpc_transport *self, grpc_transport_op *op); + void (*perform_op)(grpc_transport* self, grpc_transport_op* op); /* implementation of grpc_transport_destroy_stream */ - void (*destroy_stream)(grpc_transport *self, grpc_stream *stream, - grpc_closure *then_schedule_closure); + void (*destroy_stream)(grpc_transport* self, grpc_stream* stream, + grpc_closure* then_schedule_closure); /* implementation of grpc_transport_destroy */ - void (*destroy)(grpc_transport *self); + void (*destroy)(grpc_transport* self); /* implementation of grpc_transport_get_endpoint */ - grpc_endpoint *(*get_endpoint)(grpc_transport *self); + grpc_endpoint* (*get_endpoint)(grpc_transport* self); } grpc_transport_vtable; /* an instance of a grpc transport */ struct grpc_transport { /* pointer to a vtable defining operations on this transport */ - const grpc_transport_vtable *vtable; + const grpc_transport_vtable* vtable; }; #ifdef __cplusplus diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index cc11b0cc49..24e74c10c5 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -35,7 +35,7 @@ /* These routines are here to facilitate debugging - they produce string representations of various transport data structures */ -static void put_metadata(gpr_strvec *b, grpc_mdelem md) { +static void put_metadata(gpr_strvec* b, grpc_mdelem md) { gpr_strvec_add(b, gpr_strdup("key=")); gpr_strvec_add( b, grpc_dump_slice(GRPC_MDKEY(md), GPR_DUMP_HEX | GPR_DUMP_ASCII)); @@ -45,23 +45,23 @@ static void put_metadata(gpr_strvec *b, grpc_mdelem md) { b, grpc_dump_slice(GRPC_MDVALUE(md), GPR_DUMP_HEX | GPR_DUMP_ASCII)); } -static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { - grpc_linked_mdelem *m; +static void put_metadata_list(gpr_strvec* b, grpc_metadata_batch md) { + grpc_linked_mdelem* m; for (m = md.list.head; m != NULL; m = m->next) { if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", ")); put_metadata(b, m->md); } if (md.deadline != GRPC_MILLIS_INF_FUTURE) { - char *tmp; + char* tmp; gpr_asprintf(&tmp, " deadline=%" PRIdPTR, md.deadline); gpr_strvec_add(b, tmp); } } -char *grpc_transport_stream_op_batch_string( - grpc_transport_stream_op_batch *op) { - char *tmp; - char *out; +char* grpc_transport_stream_op_batch_string( + grpc_transport_stream_op_batch* op) { + char* tmp; + char* out; gpr_strvec b; gpr_strvec_init(&b); @@ -107,7 +107,7 @@ char *grpc_transport_stream_op_batch_string( if (op->cancel_stream) { gpr_strvec_add(&b, gpr_strdup(" ")); - const char *msg = + const char* msg = grpc_error_string(op->payload->cancel_stream.cancel_error); gpr_asprintf(&tmp, "CANCEL:%s", msg); @@ -127,9 +127,9 @@ char *grpc_transport_stream_op_batch_string( return out; } -char *grpc_transport_op_string(grpc_transport_op *op) { - char *tmp; - char *out; +char* grpc_transport_op_string(grpc_transport_op* op) { + char* tmp; + char* out; bool first = true; gpr_strvec b; @@ -153,7 +153,7 @@ char *grpc_transport_op_string(grpc_transport_op *op) { if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = false; - const char *err = grpc_error_string(op->disconnect_with_error); + const char* err = grpc_error_string(op->disconnect_with_error); gpr_asprintf(&tmp, "DISCONNECT:%s", err); gpr_strvec_add(&b, tmp); } @@ -161,7 +161,7 @@ char *grpc_transport_op_string(grpc_transport_op *op) { if (op->goaway_error) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = false; - const char *msg = grpc_error_string(op->goaway_error); + const char* msg = grpc_error_string(op->goaway_error); gpr_asprintf(&tmp, "SEND_GOAWAY:%s", msg); gpr_strvec_add(&b, tmp); @@ -199,10 +199,10 @@ char *grpc_transport_op_string(grpc_transport_op *op) { return out; } -void grpc_call_log_op(const char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - char *str = grpc_transport_stream_op_batch_string(op); +void grpc_call_log_op(const char* file, int line, gpr_log_severity severity, + grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { + char* str = grpc_transport_stream_op_batch_string(op); gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); gpr_free(str); } |