diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-12-07 15:14:14 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-12-07 15:14:14 -0800 |
commit | 87b71e2b55d0b56b4c4778ee842224fcd8f46744 (patch) | |
tree | dacbf03e8f819f5a272a024a0ca29a22fb2b0db7 /src/core/transport | |
parent | 26dab31f6ec8c6b6ede97aa00eaaa4b68d6363a7 (diff) | |
parent | f62c4d5a988f37d812373580137ff69a77305102 (diff) |
Merge branch 'connected-subchannel' into ping-ping-ping-ping-ping-ping-ping-ping-ping
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 2 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.c | 73 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.h | 11 | ||||
-rw-r--r-- | src/core/transport/static_metadata.c | 103 | ||||
-rw-r--r-- | src/core/transport/transport.c | 21 | ||||
-rw-r--r-- | src/core/transport/transport.h | 17 |
6 files changed, 92 insertions, 135 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3e88f69abf..6ba9db8348 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -910,7 +910,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); - if (op->on_connectivity_state_change) { + if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, op->on_connectivity_state_change); diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 09b298c131..c409d983da 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -88,7 +88,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %s: get %s", tracker->name, + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, grpc_connectivity_state_name(tracker->current_state)); } return tracker->current_state; @@ -98,42 +98,47 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p", - tracker->name, grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state), notify); + if (current == NULL) { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, + tracker->name, notify); + } else { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, + tracker->name, grpc_connectivity_state_name(*current), + grpc_connectivity_state_name(tracker->current_state), notify); + } } - if (tracker->current_state != *current) { - *current = tracker->current_state; - grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + if (current == NULL) { + grpc_connectivity_state_watcher *w = tracker->watchers; + if (w != NULL && w->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + tracker->watchers = w->next; + gpr_free(w); + return 0; + } + while (w != NULL) { + grpc_connectivity_state_watcher *rm_candidate = w->next; + if (rm_candidate != NULL && rm_candidate->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + w->next = w->next->next; + gpr_free(rm_candidate); + return 0; + } + w = w->next; + } + return 0; } else { - grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); - w->current = current; - w->notify = notify; - w->next = tracker->watchers; - tracker->watchers = w; - } - return tracker->current_state == GRPC_CHANNEL_IDLE; -} - -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify) { - grpc_connectivity_state_watcher *w = tracker->watchers; - if (w != NULL && w->notify == subscribed_notify) { - tracker->watchers = w->next; - gpr_free(w); - return 1; - } - while (w != NULL) { - grpc_connectivity_state_watcher *rm_candidate = w->next; - if (rm_candidate != NULL && rm_candidate->notify == subscribed_notify) { - w->next = w->next->next; - gpr_free(rm_candidate); - return 1; + if (tracker->current_state != *current) { + *current = tracker->current_state; + grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + } else { + grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = current; + w->notify = notify; + w->next = tracker->watchers; + tracker->watchers = w; } - w = w->next; + return tracker->current_state == GRPC_CHANNEL_IDLE; } - return 0; } void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, @@ -142,7 +147,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, const char *reason) { grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, + gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name, grpc_connectivity_state_name(tracker->current_state), grpc_connectivity_state_name(state), reason); } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 119b1c1554..1f3a130e90 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -73,16 +73,11 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); -/** Return 1 if the channel should start connecting, 0 otherwise */ +/** Return 1 if the channel should start connecting, 0 otherwise. + If current==NULL cancel notify if it is already queued (success==0 in that + case) */ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); -/** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present, returning 1. Otherwise, nothing is done and return - * 0. */ -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify); - #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/transport/static_metadata.c b/src/core/transport/static_metadata.c index e7aff325c2..c2bfef0397 100644 --- a/src/core/transport/static_metadata.c +++ b/src/core/transport/static_metadata.c @@ -65,92 +65,23 @@ const gpr_uint8 80, 81, 82, 33, 83, 33, 84, 33, 85, 33, 86, 33}; const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = { - "0", - "1", - "2", - "200", - "204", - "206", - "304", - "400", - "404", - "500", - "accept", - "accept-charset", - "accept-encoding", - "accept-language", - "accept-ranges", - "access-control-allow-origin", - "age", - "allow", - "application/grpc", - ":authority", - "authorization", - "cache-control", - "content-disposition", - "content-encoding", - "content-language", - "content-length", - "content-location", - "content-range", - "content-type", - "cookie", - "date", - "deflate", - "deflate,gzip", - "", - "etag", - "expect", - "expires", - "from", - "GET", - "grpc", - "grpc-accept-encoding", - "grpc-encoding", - "grpc-internal-encoding-request", - "grpc-message", - "grpc-status", - "grpc-timeout", - "gzip", - "gzip, deflate", - "host", - "http", - "https", - "identity", - "identity,deflate", - "identity,deflate,gzip", - "identity,gzip", - "if-match", - "if-modified-since", - "if-none-match", - "if-range", - "if-unmodified-since", - "last-modified", - "link", - "location", - "max-forwards", - ":method", - ":path", - "POST", - "proxy-authenticate", - "proxy-authorization", - "range", - "referer", - "refresh", - "retry-after", - ":scheme", - "server", - "set-cookie", - "/", - "/index.html", - ":status", - "strict-transport-security", - "te", - "trailers", - "transfer-encoding", - "user-agent", - "vary", - "via", + "0", "1", "2", "200", "204", "206", "304", "400", "404", "500", "accept", + "accept-charset", "accept-encoding", "accept-language", "accept-ranges", + "access-control-allow-origin", "age", "allow", "application/grpc", + ":authority", "authorization", "cache-control", "content-disposition", + "content-encoding", "content-language", "content-length", + "content-location", "content-range", "content-type", "cookie", "date", + "deflate", "deflate,gzip", "", "etag", "expect", "expires", "from", "GET", + "grpc", "grpc-accept-encoding", "grpc-encoding", + "grpc-internal-encoding-request", "grpc-message", "grpc-status", + "grpc-timeout", "gzip", "gzip, deflate", "host", "http", "https", + "identity", "identity,deflate", "identity,deflate,gzip", "identity,gzip", + "if-match", "if-modified-since", "if-none-match", "if-range", + "if-unmodified-since", "last-modified", "link", "location", "max-forwards", + ":method", ":path", "POST", "proxy-authenticate", "proxy-authorization", + "range", "referer", "refresh", "retry-after", ":scheme", "server", + "set-cookie", "/", "/index.html", ":status", "strict-transport-security", + "te", "trailers", "transfer-encoding", "user-agent", "vary", "via", "www-authenticate"}; const gpr_uint8 grpc_static_accept_encoding_metadata[8] = {0, 29, 26, 30, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index f2bebc62f3..2ab978be46 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -40,8 +40,8 @@ #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "STREAM %p:%p REF %d->%d %s", refcount, - refcount->destroy.cb_arg, val, val + 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type, + refcount, refcount->destroy.cb_arg, val, val + 1, reason); #else void grpc_stream_ref(grpc_stream_refcount *refcount) { #endif @@ -52,8 +52,8 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) { void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "STREAM %p:%p UNREF %d->%d %s", refcount, - refcount->destroy.cb_arg, val, val - 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type, + refcount, refcount->destroy.cb_arg, val, val - 1, reason); #else void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { @@ -63,6 +63,19 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, } } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg, + const char *object_type) { + refcount->object_type = object_type; +#else +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg) { +#endif + gpr_ref_init(&refcount->refs, initial_refs); + grpc_closure_init(&refcount->destroy, cb, cb_arg); +} + size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f296ce8251..f94f0ae76e 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,19 +50,32 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; +/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ + typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + const char *object_type; +#endif } grpc_stream_refcount; -/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ #ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg, + const char *object_type); void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason); void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, const char *reason); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype) #else +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg); void grpc_stream_ref(grpc_stream_refcount *refcount); void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif /* Transport stream op: a set of operations to perform on a transport @@ -96,7 +109,7 @@ typedef struct grpc_transport_stream_op { typedef struct grpc_transport_op { /** called when processing of this op is done */ grpc_closure *on_consumed; - /** connectivity monitoring */ + /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ |