aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Richard Belleville <gnossen@gmail.com>2018-12-11 14:49:19 -0800
committerGravatar GitHub <noreply@github.com>2018-12-11 14:49:19 -0800
commite829a8111801f5d9269d346c7188e3bd2e0a52a7 (patch)
treeb010216c31fb7304502a667ccd595eefe96638e6
parent9e9cae7839a362936228cf333045e5da877ace40 (diff)
parent6b3baf26185847f02c11401c971647624ba3d039 (diff)
Merge pull request #17460 from grpc/enable-census
Add hooks for census context propagation
-rw-r--r--src/python/grpcio/grpc/_cython/BUILD.bazel47
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi15
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi27
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi20
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi20
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pxd1
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx1
-rw-r--r--src/python/grpcio/grpc/_server.py72
8 files changed, 139 insertions, 64 deletions
diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel
index e318298d0a..42db7b8721 100644
--- a/src/python/grpcio/grpc/_cython/BUILD.bazel
+++ b/src/python/grpcio/grpc/_cython/BUILD.bazel
@@ -6,46 +6,47 @@ pyx_library(
name = "cygrpc",
srcs = [
"__init__.py",
- "cygrpc.pxd",
- "cygrpc.pyx",
+ "_cygrpc/_hooks.pxd.pxi",
"_cygrpc/_hooks.pyx.pxi",
- "_cygrpc/grpc_string.pyx.pxi",
+ "_cygrpc/arguments.pxd.pxi",
"_cygrpc/arguments.pyx.pxi",
+ "_cygrpc/call.pxd.pxi",
"_cygrpc/call.pyx.pxi",
- "_cygrpc/channelz.pyx.pxi",
+ "_cygrpc/channel.pxd.pxi",
"_cygrpc/channel.pyx.pxi",
- "_cygrpc/credentials.pyx.pxi",
+ "_cygrpc/channelz.pyx.pxi",
+ "_cygrpc/completion_queue.pxd.pxi",
"_cygrpc/completion_queue.pyx.pxi",
- "_cygrpc/event.pyx.pxi",
- "_cygrpc/fork_posix.pyx.pxi",
- "_cygrpc/metadata.pyx.pxi",
- "_cygrpc/operation.pyx.pxi",
- "_cygrpc/records.pyx.pxi",
- "_cygrpc/security.pyx.pxi",
- "_cygrpc/server.pyx.pxi",
- "_cygrpc/tag.pyx.pxi",
- "_cygrpc/time.pyx.pxi",
- "_cygrpc/grpc_gevent.pyx.pxi",
- "_cygrpc/grpc.pxi",
- "_cygrpc/_hooks.pxd.pxi",
- "_cygrpc/arguments.pxd.pxi",
- "_cygrpc/call.pxd.pxi",
- "_cygrpc/channel.pxd.pxi",
"_cygrpc/credentials.pxd.pxi",
- "_cygrpc/completion_queue.pxd.pxi",
+ "_cygrpc/credentials.pyx.pxi",
"_cygrpc/event.pxd.pxi",
+ "_cygrpc/event.pyx.pxi",
"_cygrpc/fork_posix.pxd.pxi",
+ "_cygrpc/fork_posix.pyx.pxi",
+ "_cygrpc/grpc.pxi",
+ "_cygrpc/grpc_gevent.pxd.pxi",
+ "_cygrpc/grpc_gevent.pyx.pxi",
+ "_cygrpc/grpc_string.pyx.pxi",
"_cygrpc/metadata.pxd.pxi",
+ "_cygrpc/metadata.pyx.pxi",
"_cygrpc/operation.pxd.pxi",
+ "_cygrpc/operation.pyx.pxi",
+ "_cygrpc/propagation_bits.pxd.pxi",
+ "_cygrpc/propagation_bits.pyx.pxi",
"_cygrpc/records.pxd.pxi",
+ "_cygrpc/records.pyx.pxi",
"_cygrpc/security.pxd.pxi",
+ "_cygrpc/security.pyx.pxi",
"_cygrpc/server.pxd.pxi",
+ "_cygrpc/server.pyx.pxi",
"_cygrpc/tag.pxd.pxi",
+ "_cygrpc/tag.pyx.pxi",
"_cygrpc/time.pxd.pxi",
- "_cygrpc/grpc_gevent.pxd.pxi",
+ "_cygrpc/time.pyx.pxi",
+ "cygrpc.pxd",
+ "cygrpc.pyx",
],
deps = [
"//:grpc",
],
)
-
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
index 38cf629dc2..cd4a51a635 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
@@ -15,3 +15,18 @@
cdef object _custom_op_on_c_call(int op, grpc_call *call):
raise NotImplementedError("No custom hooks are implemented")
+
+def install_census_context_from_call(Call call):
+ pass
+
+def uninstall_context():
+ pass
+
+def build_context():
+ pass
+
+cdef class CensusContext:
+ pass
+
+def set_census_context_on_call(_CallState call_state, CensusContext census_ctx):
+ pass
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index a81ff4d823..135d224095 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -159,7 +159,8 @@ cdef void _call(
_ChannelState channel_state, _CallState call_state,
grpc_completion_queue *c_completion_queue, on_success, int flags, method,
host, object deadline, CallCredentials credentials,
- object operationses_and_user_tags, object metadata) except *:
+ object operationses_and_user_tags, object metadata,
+ object context) except *:
"""Invokes an RPC.
Args:
@@ -185,6 +186,7 @@ cdef void _call(
which is an object to be used as a tag. A SendInitialMetadataOperation
must be present in the first element of this value.
metadata: The metadata for this call.
+ context: Context object for distributed tracing.
"""
cdef grpc_slice method_slice
cdef grpc_slice host_slice
@@ -208,6 +210,8 @@ cdef void _call(
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
+ if context is not None:
+ set_census_context_on_call(call_state, context)
if credentials is not None:
c_call_credentials = credentials.c()
c_call_error = grpc_call_set_credentials(
@@ -257,7 +261,8 @@ cdef class IntegratedCall:
cdef IntegratedCall _integrated_call(
_ChannelState state, int flags, method, host, object deadline,
- object metadata, CallCredentials credentials, operationses_and_user_tags):
+ object metadata, CallCredentials credentials, operationses_and_user_tags,
+ object context):
call_state = _CallState()
def on_success(started_tags):
@@ -266,7 +271,7 @@ cdef IntegratedCall _integrated_call(
_call(
state, call_state, state.c_call_completion_queue, on_success, flags,
- method, host, deadline, credentials, operationses_and_user_tags, metadata)
+ method, host, deadline, credentials, operationses_and_user_tags, metadata, context)
return IntegratedCall(state, call_state)
@@ -308,7 +313,8 @@ cdef class SegregatedCall:
cdef SegregatedCall _segregated_call(
_ChannelState state, int flags, method, host, object deadline,
- object metadata, CallCredentials credentials, operationses_and_user_tags):
+ object metadata, CallCredentials credentials, operationses_and_user_tags,
+ object context):
cdef _CallState call_state = _CallState()
cdef SegregatedCall segregated_call
cdef grpc_completion_queue *c_completion_queue
@@ -325,7 +331,8 @@ cdef SegregatedCall _segregated_call(
try:
_call(
state, call_state, c_completion_queue, on_success, flags, method, host,
- deadline, credentials, operationses_and_user_tags, metadata)
+ deadline, credentials, operationses_and_user_tags, metadata,
+ context)
except:
_destroy_c_completion_queue(c_completion_queue)
raise
@@ -443,10 +450,11 @@ cdef class Channel:
def integrated_call(
self, int flags, method, host, object deadline, object metadata,
- CallCredentials credentials, operationses_and_tags):
+ CallCredentials credentials, operationses_and_tags,
+ object context = None):
return _integrated_call(
self._state, flags, method, host, deadline, metadata, credentials,
- operationses_and_tags)
+ operationses_and_tags, context)
def next_call_event(self):
def on_success(tag):
@@ -461,10 +469,11 @@ cdef class Channel:
def segregated_call(
self, int flags, method, host, object deadline, object metadata,
- CallCredentials credentials, operationses_and_tags):
+ CallCredentials credentials, operationses_and_tags,
+ object context = None):
return _segregated_call(
self._state, flags, method, host, deadline, metadata, credentials,
- operationses_and_tags)
+ operationses_and_tags, context)
def check_connectivity_state(self, bint try_to_connect):
with self._state.condition:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi
new file mode 100644
index 0000000000..cd6e94c816
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi
@@ -0,0 +1,20 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef extern from "grpc/impl/codegen/propagation_bits.h":
+ cdef int _GRPC_PROPAGATE_DEADLINE "GRPC_PROPAGATE_DEADLINE"
+ cdef int _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT "GRPC_PROPAGATE_CENSUS_STATS_CONTEXT"
+ cdef int _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT "GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT"
+ cdef int _GRPC_PROPAGATE_CANCELLATION "GRPC_PROPAGATE_CANCELLATION"
+ cdef int _GRPC_PROPAGATE_DEFAULTS "GRPC_PROPAGATE_DEFAULTS"
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi
new file mode 100644
index 0000000000..2dcc76a2db
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi
@@ -0,0 +1,20 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class PropagationConstants:
+ GRPC_PROPAGATE_DEADLINE = _GRPC_PROPAGATE_DEADLINE
+ GRPC_PROPAGATE_CENSUS_STATS_CONTEXT = _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT
+ GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT = _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT
+ GRPC_PROPAGATE_CANCELLATION = _GRPC_PROPAGATE_CANCELLATION
+ GRPC_PROPAGATE_DEFAULTS = _GRPC_PROPAGATE_DEFAULTS
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index 8258b857bc..64cae6b34d 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -29,6 +29,7 @@ include "_cygrpc/server.pxd.pxi"
include "_cygrpc/tag.pxd.pxi"
include "_cygrpc/time.pxd.pxi"
include "_cygrpc/_hooks.pxd.pxi"
+include "_cygrpc/propagation_bits.pxd.pxi"
include "_cygrpc/grpc_gevent.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 9ab919375c..ce98fa3a8e 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -36,6 +36,7 @@ include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
include "_cygrpc/channelz.pyx.pxi"
+include "_cygrpc/propagation_bits.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 7276a7fd90..e939f615df 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -480,43 +480,51 @@ def _status(rpc_event, state, serialized_response):
def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
- argument = argument_thunk()
- if argument is not None:
- response, proceed = _call_behavior(rpc_event, state, behavior, argument,
- request_deserializer)
- if proceed:
- serialized_response = _serialize_response(
- rpc_event, state, response, response_serializer)
- if serialized_response is not None:
- _status(rpc_event, state, serialized_response)
+ cygrpc.install_census_context_from_call(rpc_event.call)
+ try:
+ argument = argument_thunk()
+ if argument is not None:
+ response, proceed = _call_behavior(rpc_event, state, behavior,
+ argument, request_deserializer)
+ if proceed:
+ serialized_response = _serialize_response(
+ rpc_event, state, response, response_serializer)
+ if serialized_response is not None:
+ _status(rpc_event, state, serialized_response)
+ finally:
+ cygrpc.uninstall_context()
def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
- argument = argument_thunk()
- if argument is not None:
- response_iterator, proceed = _call_behavior(
- rpc_event, state, behavior, argument, request_deserializer)
- if proceed:
- while True:
- response, proceed = _take_response_from_response_iterator(
- rpc_event, state, response_iterator)
- if proceed:
- if response is None:
- _status(rpc_event, state, None)
- break
- else:
- serialized_response = _serialize_response(
- rpc_event, state, response, response_serializer)
- if serialized_response is not None:
- proceed = _send_response(rpc_event, state,
- serialized_response)
- if not proceed:
- break
- else:
+ cygrpc.install_census_context_from_call(rpc_event.call)
+ try:
+ argument = argument_thunk()
+ if argument is not None:
+ response_iterator, proceed = _call_behavior(
+ rpc_event, state, behavior, argument, request_deserializer)
+ if proceed:
+ while True:
+ response, proceed = _take_response_from_response_iterator(
+ rpc_event, state, response_iterator)
+ if proceed:
+ if response is None:
+ _status(rpc_event, state, None)
break
- else:
- break
+ else:
+ serialized_response = _serialize_response(
+ rpc_event, state, response, response_serializer)
+ if serialized_response is not None:
+ proceed = _send_response(
+ rpc_event, state, serialized_response)
+ if not proceed:
+ break
+ else:
+ break
+ else:
+ break
+ finally:
+ cygrpc.uninstall_context()
def _handle_unary_unary(rpc_event, state, method_handler, thread_pool):