diff options
author | Richard Belleville <rbellevi@google.com> | 2018-12-11 10:18:43 -0800 |
---|---|---|
committer | Richard Belleville <rbellevi@google.com> | 2018-12-11 12:59:31 -0800 |
commit | 6b3baf26185847f02c11401c971647624ba3d039 (patch) | |
tree | 931823bff40477f6ec4d9929a994b1211abb0e0c /src/python | |
parent | 6f675abe85927a2f6f18babc4445b7f210e41d5e (diff) |
Add hooks for census context propagation
Appease the yapf gods
Reformat
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/grpcio/grpc/_cython/BUILD.bazel | 47 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi | 15 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi | 27 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi | 20 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi | 20 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/cygrpc.pxd | 1 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 72 |
8 files changed, 139 insertions, 64 deletions
diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index e318298d0a..42db7b8721 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -6,46 +6,47 @@ pyx_library( name = "cygrpc", srcs = [ "__init__.py", - "cygrpc.pxd", - "cygrpc.pyx", + "_cygrpc/_hooks.pxd.pxi", "_cygrpc/_hooks.pyx.pxi", - "_cygrpc/grpc_string.pyx.pxi", + "_cygrpc/arguments.pxd.pxi", "_cygrpc/arguments.pyx.pxi", + "_cygrpc/call.pxd.pxi", "_cygrpc/call.pyx.pxi", - "_cygrpc/channelz.pyx.pxi", + "_cygrpc/channel.pxd.pxi", "_cygrpc/channel.pyx.pxi", - "_cygrpc/credentials.pyx.pxi", + "_cygrpc/channelz.pyx.pxi", + "_cygrpc/completion_queue.pxd.pxi", "_cygrpc/completion_queue.pyx.pxi", - "_cygrpc/event.pyx.pxi", - "_cygrpc/fork_posix.pyx.pxi", - "_cygrpc/metadata.pyx.pxi", - "_cygrpc/operation.pyx.pxi", - "_cygrpc/records.pyx.pxi", - "_cygrpc/security.pyx.pxi", - "_cygrpc/server.pyx.pxi", - "_cygrpc/tag.pyx.pxi", - "_cygrpc/time.pyx.pxi", - "_cygrpc/grpc_gevent.pyx.pxi", - "_cygrpc/grpc.pxi", - "_cygrpc/_hooks.pxd.pxi", - "_cygrpc/arguments.pxd.pxi", - "_cygrpc/call.pxd.pxi", - "_cygrpc/channel.pxd.pxi", "_cygrpc/credentials.pxd.pxi", - "_cygrpc/completion_queue.pxd.pxi", + "_cygrpc/credentials.pyx.pxi", "_cygrpc/event.pxd.pxi", + "_cygrpc/event.pyx.pxi", "_cygrpc/fork_posix.pxd.pxi", + "_cygrpc/fork_posix.pyx.pxi", + "_cygrpc/grpc.pxi", + "_cygrpc/grpc_gevent.pxd.pxi", + "_cygrpc/grpc_gevent.pyx.pxi", + "_cygrpc/grpc_string.pyx.pxi", "_cygrpc/metadata.pxd.pxi", + "_cygrpc/metadata.pyx.pxi", "_cygrpc/operation.pxd.pxi", + "_cygrpc/operation.pyx.pxi", + "_cygrpc/propagation_bits.pxd.pxi", + "_cygrpc/propagation_bits.pyx.pxi", "_cygrpc/records.pxd.pxi", + "_cygrpc/records.pyx.pxi", "_cygrpc/security.pxd.pxi", + "_cygrpc/security.pyx.pxi", "_cygrpc/server.pxd.pxi", + "_cygrpc/server.pyx.pxi", "_cygrpc/tag.pxd.pxi", + "_cygrpc/tag.pyx.pxi", "_cygrpc/time.pxd.pxi", - "_cygrpc/grpc_gevent.pxd.pxi", + "_cygrpc/time.pyx.pxi", + "cygrpc.pxd", + "cygrpc.pyx", ], deps = [ "//:grpc", ], ) - diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi index 38cf629dc2..cd4a51a635 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi @@ -15,3 +15,18 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call): raise NotImplementedError("No custom hooks are implemented") + +def install_census_context_from_call(Call call): + pass + +def uninstall_context(): + pass + +def build_context(): + pass + +cdef class CensusContext: + pass + +def set_census_context_on_call(_CallState call_state, CensusContext census_ctx): + pass diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index a81ff4d823..135d224095 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -159,7 +159,8 @@ cdef void _call( _ChannelState channel_state, _CallState call_state, grpc_completion_queue *c_completion_queue, on_success, int flags, method, host, object deadline, CallCredentials credentials, - object operationses_and_user_tags, object metadata) except *: + object operationses_and_user_tags, object metadata, + object context) except *: """Invokes an RPC. Args: @@ -185,6 +186,7 @@ cdef void _call( which is an object to be used as a tag. A SendInitialMetadataOperation must be present in the first element of this value. metadata: The metadata for this call. + context: Context object for distributed tracing. """ cdef grpc_slice method_slice cdef grpc_slice host_slice @@ -208,6 +210,8 @@ cdef void _call( grpc_slice_unref(method_slice) if host_slice_ptr: grpc_slice_unref(host_slice) + if context is not None: + set_census_context_on_call(call_state, context) if credentials is not None: c_call_credentials = credentials.c() c_call_error = grpc_call_set_credentials( @@ -257,7 +261,8 @@ cdef class IntegratedCall: cdef IntegratedCall _integrated_call( _ChannelState state, int flags, method, host, object deadline, - object metadata, CallCredentials credentials, operationses_and_user_tags): + object metadata, CallCredentials credentials, operationses_and_user_tags, + object context): call_state = _CallState() def on_success(started_tags): @@ -266,7 +271,7 @@ cdef IntegratedCall _integrated_call( _call( state, call_state, state.c_call_completion_queue, on_success, flags, - method, host, deadline, credentials, operationses_and_user_tags, metadata) + method, host, deadline, credentials, operationses_and_user_tags, metadata, context) return IntegratedCall(state, call_state) @@ -308,7 +313,8 @@ cdef class SegregatedCall: cdef SegregatedCall _segregated_call( _ChannelState state, int flags, method, host, object deadline, - object metadata, CallCredentials credentials, operationses_and_user_tags): + object metadata, CallCredentials credentials, operationses_and_user_tags, + object context): cdef _CallState call_state = _CallState() cdef SegregatedCall segregated_call cdef grpc_completion_queue *c_completion_queue @@ -325,7 +331,8 @@ cdef SegregatedCall _segregated_call( try: _call( state, call_state, c_completion_queue, on_success, flags, method, host, - deadline, credentials, operationses_and_user_tags, metadata) + deadline, credentials, operationses_and_user_tags, metadata, + context) except: _destroy_c_completion_queue(c_completion_queue) raise @@ -443,10 +450,11 @@ cdef class Channel: def integrated_call( self, int flags, method, host, object deadline, object metadata, - CallCredentials credentials, operationses_and_tags): + CallCredentials credentials, operationses_and_tags, + object context = None): return _integrated_call( self._state, flags, method, host, deadline, metadata, credentials, - operationses_and_tags) + operationses_and_tags, context) def next_call_event(self): def on_success(tag): @@ -461,10 +469,11 @@ cdef class Channel: def segregated_call( self, int flags, method, host, object deadline, object metadata, - CallCredentials credentials, operationses_and_tags): + CallCredentials credentials, operationses_and_tags, + object context = None): return _segregated_call( self._state, flags, method, host, deadline, metadata, credentials, - operationses_and_tags) + operationses_and_tags, context) def check_connectivity_state(self, bint try_to_connect): with self._state.condition: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi new file mode 100644 index 0000000000..cd6e94c816 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi @@ -0,0 +1,20 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef extern from "grpc/impl/codegen/propagation_bits.h": + cdef int _GRPC_PROPAGATE_DEADLINE "GRPC_PROPAGATE_DEADLINE" + cdef int _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT "GRPC_PROPAGATE_CENSUS_STATS_CONTEXT" + cdef int _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT "GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT" + cdef int _GRPC_PROPAGATE_CANCELLATION "GRPC_PROPAGATE_CANCELLATION" + cdef int _GRPC_PROPAGATE_DEFAULTS "GRPC_PROPAGATE_DEFAULTS" diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi new file mode 100644 index 0000000000..2dcc76a2db --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi @@ -0,0 +1,20 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class PropagationConstants: + GRPC_PROPAGATE_DEADLINE = _GRPC_PROPAGATE_DEADLINE + GRPC_PROPAGATE_CENSUS_STATS_CONTEXT = _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT + GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT = _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT + GRPC_PROPAGATE_CANCELLATION = _GRPC_PROPAGATE_CANCELLATION + GRPC_PROPAGATE_DEFAULTS = _GRPC_PROPAGATE_DEFAULTS diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 8258b857bc..64cae6b34d 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -29,6 +29,7 @@ include "_cygrpc/server.pxd.pxi" include "_cygrpc/tag.pxd.pxi" include "_cygrpc/time.pxd.pxi" include "_cygrpc/_hooks.pxd.pxi" +include "_cygrpc/propagation_bits.pxd.pxi" include "_cygrpc/grpc_gevent.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 9ab919375c..ce98fa3a8e 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -36,6 +36,7 @@ include "_cygrpc/tag.pyx.pxi" include "_cygrpc/time.pyx.pxi" include "_cygrpc/_hooks.pyx.pxi" include "_cygrpc/channelz.pyx.pxi" +include "_cygrpc/propagation_bits.pyx.pxi" include "_cygrpc/grpc_gevent.pyx.pxi" diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 7276a7fd90..e939f615df 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -480,43 +480,51 @@ def _status(rpc_event, state, serialized_response): def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, request_deserializer, response_serializer): - argument = argument_thunk() - if argument is not None: - response, proceed = _call_behavior(rpc_event, state, behavior, argument, - request_deserializer) - if proceed: - serialized_response = _serialize_response( - rpc_event, state, response, response_serializer) - if serialized_response is not None: - _status(rpc_event, state, serialized_response) + cygrpc.install_census_context_from_call(rpc_event.call) + try: + argument = argument_thunk() + if argument is not None: + response, proceed = _call_behavior(rpc_event, state, behavior, + argument, request_deserializer) + if proceed: + serialized_response = _serialize_response( + rpc_event, state, response, response_serializer) + if serialized_response is not None: + _status(rpc_event, state, serialized_response) + finally: + cygrpc.uninstall_context() def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, request_deserializer, response_serializer): - argument = argument_thunk() - if argument is not None: - response_iterator, proceed = _call_behavior( - rpc_event, state, behavior, argument, request_deserializer) - if proceed: - while True: - response, proceed = _take_response_from_response_iterator( - rpc_event, state, response_iterator) - if proceed: - if response is None: - _status(rpc_event, state, None) - break - else: - serialized_response = _serialize_response( - rpc_event, state, response, response_serializer) - if serialized_response is not None: - proceed = _send_response(rpc_event, state, - serialized_response) - if not proceed: - break - else: + cygrpc.install_census_context_from_call(rpc_event.call) + try: + argument = argument_thunk() + if argument is not None: + response_iterator, proceed = _call_behavior( + rpc_event, state, behavior, argument, request_deserializer) + if proceed: + while True: + response, proceed = _take_response_from_response_iterator( + rpc_event, state, response_iterator) + if proceed: + if response is None: + _status(rpc_event, state, None) break - else: - break + else: + serialized_response = _serialize_response( + rpc_event, state, response, response_serializer) + if serialized_response is not None: + proceed = _send_response( + rpc_event, state, serialized_response) + if not proceed: + break + else: + break + else: + break + finally: + cygrpc.uninstall_context() def _handle_unary_unary(rpc_event, state, method_handler, thread_pool): |