diff options
-rwxr-xr-x | grpc.gemspec | 2 | ||||
-rw-r--r-- | include/grpc/grpc.h | 12 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 82 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_byte_buffer.c | 1 | ||||
-rw-r--r-- | templates/grpc.gemspec.template | 2 | ||||
-rwxr-xr-x | tools/run_tests/run_tests.py | 4 |
6 files changed, 58 insertions, 45 deletions
diff --git a/grpc.gemspec b/grpc.gemspec index 088b99b1e6..724922ea46 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -27,7 +27,7 @@ Gem::Specification.new do |s| s.require_paths = %w( src/ruby/bin src/ruby/lib src/ruby/pb ) s.platform = Gem::Platform::RUBY - s.add_dependency 'google-protobuf', '~> 3.0.2' + s.add_dependency 'google-protobuf', '~> 3.1.0' s.add_dependency 'googleauth', '~> 0.5.1' s.add_development_dependency 'bundler', '~> 1.9' diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 6f7a67b715..8cb278ff8d 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -197,9 +197,15 @@ GRPCAPI grpc_call *grpc_channel_create_registered_call( completion of type 'tag' to the completion queue bound to the call. The order of ops specified in the batch has no significance. Only one operation of each type can be active at once in any given - batch. You must call grpc_completion_queue_next or - grpc_completion_queue_pluck on the completion queue associated with 'call' - for work to be performed. + batch. + If a call to grpc_call_start_batch returns GRPC_CALL_OK you must call + grpc_completion_queue_next or grpc_completion_queue_pluck on the completion + queue associated with 'call' for work to be performed. If a call to + grpc_call_start_batch returns any value other than GRPC_CALL_OK it is + guaranteed that no state associated with 'call' is changed and it is not + appropriate to call grpc_completion_queue_next or + grpc_completion_queue_pluck consequent to the failed grpc_call_start_batch + call. THREAD SAFETY: access to grpc_call_start_batch in multi-threaded environment needs to be synchronized. As an optimization, you may synchronize batches containing just send operations independently from batches containing just diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 3117dd1cb3..3ac735a4ec 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -36,8 +36,8 @@ import time import grpc from grpc import _common from grpc import _grpcio_metadata -from grpc.framework.foundation import callable_util from grpc._cython import cygrpc +from grpc.framework.foundation import callable_util _USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) @@ -358,7 +358,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): if self._state.callbacks is None: return False else: - self._state.callbacks.append(lambda: callback()) + self._state.callbacks.append(callback) return True def initial_metadata(self): @@ -435,10 +435,10 @@ def _end_unary_response_blocking(state, with_call, deadline): class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -490,23 +490,24 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): if rendezvous: return rendezvous else: - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() return _Rendezvous(state, call, self._response_deserializer, deadline) class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -518,7 +519,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -536,16 +537,17 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() return _Rendezvous(state, call, self._response_deserializer, deadline) class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -597,7 +599,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self, request_iterator, timeout=None, metadata=None, credentials=None): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -614,6 +616,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() _consume_request_iterator( request_iterator, state, call, self._request_serializer) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -622,10 +625,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -634,7 +637,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): self, request_iterator, timeout=None, metadata=None, credentials=None): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -650,6 +653,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() _consume_request_iterator( request_iterator, state, call, self._request_serializer) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -687,16 +691,13 @@ def _run_channel_spin_thread(state): channel_spin_thread.start() -def _create_channel_managed_call(state): - def create_channel_managed_call(parent, flags, method, host, deadline): - """Creates a managed cygrpc.Call. +def _channel_managed_call_management(state): + def create(parent, flags, method, host, deadline): + """Creates a managed cygrpc.Call and a function to call to drive it. - Callers of this function must conduct at least one operation on the returned - call. The tags associated with operations conducted on the returned call - must be no-argument callables that return None to indicate that this channel - should continue polling for events associated with the call and return the - call itself to indicate that no more events associated with the call will be - generated. + If operations are successfully added to the returned cygrpc.Call, the + returned function must be called. If operations are not successfully added + to the returned cygrpc.Call, the returned function must not be called. Args: parent: A cygrpc.Call to be used as the parent of the created call. @@ -706,18 +707,22 @@ def _create_channel_managed_call(state): deadline: A cygrpc.Timespec to be the deadline of the created call. Returns: - A cygrpc.Call with which to conduct an RPC. + A cygrpc.Call with which to conduct an RPC and a function to call if + operations are successfully started on the call. """ - with state.lock: - call = state.channel.create_call( - parent, flags, state.completion_queue, method, host, deadline) - if state.managed_calls is None: - state.managed_calls = set((call,)) - _run_channel_spin_thread(state) - else: - state.managed_calls.add(call) - return call - return create_channel_managed_call + call = state.channel.create_call( + parent, flags, state.completion_queue, method, host, deadline) + + def drive(): + with state.lock: + if state.managed_calls is None: + state.managed_calls = set((call,)) + _run_channel_spin_thread(state) + else: + state.managed_calls.add(call) + + return call, drive + return create class _ChannelConnectivityState(object): @@ -857,6 +862,7 @@ def _options(options): class Channel(grpc.Channel): + """A cygrpc.Channel-backed implementation of grpc.Channel.""" def __init__(self, target, options, credentials): """Constructor. @@ -880,25 +886,25 @@ class Channel(grpc.Channel): def unary_unary( self, method, request_serializer=None, response_deserializer=None): return _UnaryUnaryMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def unary_stream( self, method, request_serializer=None, response_deserializer=None): return _UnaryStreamMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_unary( self, method, request_serializer=None, response_deserializer=None): return _StreamUnaryMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_stream( self, method, request_serializer=None, response_deserializer=None): return _StreamStreamMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def __del__(self): diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c index 61b7c30315..28c6a0fd3a 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.c +++ b/src/ruby/ext/grpc/rb_byte_buffer.c @@ -65,5 +65,6 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { GPR_SLICE_LENGTH(next)); gpr_slice_unref(next); } + grpc_byte_buffer_reader_destroy(&reader); return rb_string; } diff --git a/templates/grpc.gemspec.template b/templates/grpc.gemspec.template index 62d61b75c1..82fbb69008 100644 --- a/templates/grpc.gemspec.template +++ b/templates/grpc.gemspec.template @@ -29,7 +29,7 @@ s.require_paths = %w( src/ruby/bin src/ruby/lib src/ruby/pb ) s.platform = Gem::Platform::RUBY - s.add_dependency 'google-protobuf', '~> 3.0.2' + s.add_dependency 'google-protobuf', '~> 3.1.0' s.add_dependency 'googleauth', '~> 0.5.1' s.add_development_dependency 'bundler', '~> 1.9' diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index cd49dfa907..0424cf3d6f 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -435,8 +435,8 @@ class PythonLanguage(object): return [self.config.job_spec( config.run, timeout_seconds=5*60, - environ=dict(environment.items() + - [('GRPC_PYTHON_TESTRUNNER_FILTER', suite_name)]), + environ=dict(list(environment.items()) + + [('GRPC_PYTHON_TESTRUNNER_FILTER', str(suite_name))]), shortname='%s.test.%s' % (config.name, suite_name),) for suite_name in tests_json for config in self.pythons] |