diff options
author | Julien Boeuf <jboeuf@google.com> | 2015-05-29 20:35:57 -0700 |
---|---|---|
committer | Julien Boeuf <jboeuf@google.com> | 2015-05-29 20:35:57 -0700 |
commit | 449db81b7f47f8bc443443d6b095070bf3c8c28d (patch) | |
tree | 95690713220afa09fd8f75bd9ed9d05721822617 /src | |
parent | 1f030301234b3802d1e47b5173dc744ffca7bcdc (diff) | |
parent | 5242c1e22a18f09f25737fd67732805cf55be751 (diff) |
Merge branch 'master' of github.com:grpc/grpc into mdctx_free_creds
Diffstat (limited to 'src')
30 files changed, 175 insertions, 72 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 63615ea25f..b697fcc64a 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -165,7 +165,7 @@ static void maybe_wake_one_watcher(grpc_fd *fd) { gpr_mu_unlock(&fd->watcher_mu); } -static void wake_all_watchers(grpc_fd *fd) { +static void wake_all_watchers_locked(grpc_fd *fd) { grpc_fd_watcher *watcher; for (watcher = fd->inactive_watcher_root.next; watcher != &fd->inactive_watcher_root; watcher = watcher->next) { @@ -184,7 +184,9 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { fd->on_done_user_data = user_data; shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ - wake_all_watchers(fd); + gpr_mu_lock(&fd->watcher_mu); + wake_all_watchers_locked(fd); + gpr_mu_unlock(&fd->watcher_mu); unref_by(fd, 2); /* drop the reference */ } diff --git a/src/core/support/cmdline.c b/src/core/support/cmdline.c index 72f46c1bd7..530952c437 100644 --- a/src/core/support/cmdline.c +++ b/src/core/support/cmdline.c @@ -131,33 +131,63 @@ void gpr_cmdline_on_extra_arg( cl->extra_arg_help = help; } -static void print_usage_and_die(gpr_cmdline *cl) { +/* recursively descend argument list, adding the last element + to s first - so that arguments are added in the order they were + added to the list by api calls */ +static void add_args_to_usage(gpr_strvec *s, arg *a) { + char *tmp; + + if (!a) return; + add_args_to_usage(s, a->next); + + switch (a->type) { + case ARGTYPE_BOOL: + gpr_asprintf(&tmp, " [--%s|--no-%s]", a->name, a->name); + gpr_strvec_add(s, tmp); + break; + case ARGTYPE_STRING: + gpr_asprintf(&tmp, " [--%s=string]", a->name); + gpr_strvec_add(s, tmp); + break; + case ARGTYPE_INT: + gpr_asprintf(&tmp, " [--%s=int]", a->name); + gpr_strvec_add(s, tmp); + break; + } +} + +char *gpr_cmdline_usage_string(gpr_cmdline *cl, const char *argv0) { /* TODO(ctiller): make this prettier */ - arg *a; - const char *name = strrchr(cl->argv0, '/'); + gpr_strvec s; + char *tmp; + const char *name = strrchr(argv0, '/'); + if (name) { name++; } else { - name = cl->argv0; - } - fprintf(stderr, "Usage: %s", name); - for (a = cl->args; a; a = a->next) { - switch (a->type) { - case ARGTYPE_BOOL: - fprintf(stderr, " [--%s|--no-%s]", a->name, a->name); - break; - case ARGTYPE_STRING: - fprintf(stderr, " [--%s=string]", a->name); - break; - case ARGTYPE_INT: - fprintf(stderr, " [--%s=int]", a->name); - break; - } + name = argv0; } + + gpr_strvec_init(&s); + + gpr_asprintf(&tmp, "Usage: %s", name); + gpr_strvec_add(&s, tmp); + add_args_to_usage(&s, cl->args); if (cl->extra_arg) { - fprintf(stderr, " [%s...]", cl->extra_arg_name); + gpr_asprintf(&tmp, " [%s...]", cl->extra_arg_name); + gpr_strvec_add(&s, tmp); } - fprintf(stderr, "\n"); + gpr_strvec_add(&s, gpr_strdup("\n")); + + tmp = gpr_strvec_flatten(&s, NULL); + gpr_strvec_destroy(&s); + return tmp; +} + +static void print_usage_and_die(gpr_cmdline *cl) { + char *usage = gpr_cmdline_usage_string(cl, cl->argv0); + fprintf(stderr, "%s", usage); + gpr_free(usage); exit(1); } diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index e66b4ed2d8..80eb488b41 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -67,6 +67,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_metadata_array_init(&request_metadata_); } + ~SyncRequest() { + grpc_metadata_array_destroy(&request_metadata_); + } + static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; *ok = false; diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec index 85aee35566..171259d18d 100644 --- a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec +++ b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec @@ -7,7 +7,7 @@ <description>Auth library for C# implementation of gRPC - an RPC library and framework. See project site for more info.</description> <version>0.5.0</version> <authors>Google Inc.</authors> - <owners>jtattermusch</owners> + <owners>grpc-packages</owners> <licenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</licenseUrl> <projectUrl>https://github.com/grpc/grpc</projectUrl> <requireLicenseAcceptance>false</requireLicenseAcceptance> diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index 69e8497bb7..42eb90c9a3 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -7,7 +7,7 @@ <description>Core C# implementation of gRPC - an RPC library and framework. See project site for more info.</description> <version>0.5.0</version> <authors>Google Inc.</authors> - <owners>jtattermusch</owners> + <owners>grpc-packages</owners> <licenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</licenseUrl> <projectUrl>https://github.com/grpc/grpc</projectUrl> <requireLicenseAcceptance>false</requireLicenseAcceptance> diff --git a/src/csharp/Grpc.nuspec b/src/csharp/Grpc.nuspec index e3195e1806..b9a76f2c1a 100644 --- a/src/csharp/Grpc.nuspec +++ b/src/csharp/Grpc.nuspec @@ -7,7 +7,7 @@ <description>C# implementation of gRPC - an RPC library and framework. See project site for more info.</description> <version>0.5.0</version> <authors>Google Inc.</authors> - <owners>jtattermusch</owners> + <owners>grpc-packages</owners> <licenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</licenseUrl> <projectUrl>https://github.com/grpc/grpc</projectUrl> <requireLicenseAcceptance>false</requireLicenseAcceptance> @@ -18,5 +18,8 @@ <dependency id="Grpc.Core" version="0.5.0" /> </dependencies> </metadata> - <files/> + <files> + <file src="protoc.exe" target="tools" /> + <file src="grpc_csharp_plugin.exe" target="tools" /> + </files> </package> diff --git a/src/csharp/README.md b/src/csharp/README.md index 43d519349f..bb5e165986 100644 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -35,15 +35,16 @@ Usage: Linux (Mono) - (preferred approach) add `libgrpc_csharp_ext.so` to `/etc/ld.so.cache` by running: ```sh - echo "$HOME/.linuxbrew/lib" | sudo tee /etc/ld.so.conf.d/zzz_brew_lib.conf - sudo ldconfig + $ echo "$HOME/.linuxbrew/lib" | sudo tee /etc/ld.so.conf.d/zzz_brew_lib.conf + $ sudo ldconfig ``` - (adhoc approach) set `LD_LIBRARY_PATH` environment variable to point to directory containing `libgrpc_csharp_ext.so`: ```sh - export LD_LIBRARY_PATH=$HOME/.linuxbrew/lib:${LD_LIBRARY_PATH} + $ export LD_LIBRARY_PATH=$HOME/.linuxbrew/lib:${LD_LIBRARY_PATH} ``` + - (if you are contributor) installing gRPC from sources using `sudo make install_grpc_csharp_ext` also works. - Open MonoDevelop and start a new project/solution. @@ -87,14 +88,14 @@ If you are a user of gRPC C#, go to Usage section above. a convenience batch script that builds everything for you. ``` - buildall.bat + > buildall.bat ``` - Open Grpc.sln using Visual Studio 2013. NuGet dependencies will be restored upon build (you need to have NuGet add-in installed). -Building: Linux & Mono +Building: Linux (Mono) ---------------------- You only need to go through these steps if you are planning to develop gRPC C#. @@ -103,8 +104,8 @@ If you are a user of gRPC C#, go to Usage section above. - Prerequisites for development: Mono 3.2.8+, MonoDevelop 5.9 with NuGet and NUnit add-ins installed. ```sh - sudo apt-get install mono-devel - sudo apt-get install nunit nunit-console + $ sudo apt-get install mono-devel + $ sudo apt-get install nunit nunit-console ``` You can use older versions of MonoDevelop, but then you might need to restore @@ -114,8 +115,8 @@ don't support NuGet add-in. - Compile and install the gRPC C# extension library (that will be used via P/Invoke from C#). ```sh - make grpc_csharp_ext - sudo make install_grpc_csharp_ext + $ make grpc_csharp_ext + $ sudo make install_grpc_csharp_ext ``` - Use MonoDevelop to open the solution Grpc.sln @@ -135,9 +136,9 @@ Then you should be able to run all the test from the Test View. After building the solution, you can also run the tests from command line using nunit-console tool. -``` +```sh # from Grpc.Core.Test/bin/Debug directory -nunit-console Grpc.Core.Tests.dll +$ nunit-console Grpc.Core.Tests.dll ``` Contents diff --git a/src/php/bin/run_gen_code_test.sh b/src/php/bin/run_gen_code_test.sh index 79abbe6cf8..4882a2b846 100755 --- a/src/php/bin/run_gen_code_test.sh +++ b/src/php/bin/run_gen_code_test.sh @@ -29,9 +29,9 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. cd $(dirname $0) -GRPC_TEST_HOST=localhost:7070 php -d extension_dir=../ext/grpc/modules/ \ +GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ ../tests/generated_code/GeneratedCodeTest.php -GRPC_TEST_HOST=localhost:7070 php -d extension_dir=../ext/grpc/modules/ \ +GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ ../tests/generated_code/GeneratedCodeWithCallbackTest.php diff --git a/src/python/interop/interop/_insecure_interop_test.py b/src/python/interop/interop/_insecure_interop_test.py index 42e7a4d5c4..98ea3a6648 100644 --- a/src/python/interop/interop/_insecure_interop_test.py +++ b/src/python/interop/interop/_insecure_interop_test.py @@ -54,4 +54,4 @@ class InsecureInteropTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/interop/interop/_interop_test_case.py b/src/python/interop/interop/_interop_test_case.py index cd6a574e90..f40ef0ec83 100644 --- a/src/python/interop/interop/_interop_test_case.py +++ b/src/python/interop/interop/_interop_test_case.py @@ -53,3 +53,9 @@ class InteropTestCase(object): def testPingPong(self): methods.TestCase.PING_PONG.test_interoperability(self.stub, None) + + def testCancelAfterBegin(self): + methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability(self.stub, None) + + def testCancelAfterFirstResponse(self): + methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None) diff --git a/src/python/interop/interop/_secure_interop_test.py b/src/python/interop/interop/_secure_interop_test.py index 27e76315b6..be7618f549 100644 --- a/src/python/interop/interop/_secure_interop_test.py +++ b/src/python/interop/interop/_secure_interop_test.py @@ -61,4 +61,4 @@ class SecureInteropTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py index 909b738bd1..194afadb17 100644 --- a/src/python/interop/interop/methods.py +++ b/src/python/interop/interop/methods.py @@ -219,6 +219,17 @@ def _server_streaming(stub): raise ValueError( 'response body of invalid size %d!' % len(response.payload.body)) +def _cancel_after_begin(stub): + with stub: + sizes = (27182, 8, 1828, 45904) + payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes] + requests = [messages_pb2.StreamingInputCallRequest(payload=payload) + for payload in payloads] + responses = stub.StreamingInputCall.async(requests, _TIMEOUT) + responses.cancel() + if not responses.cancelled(): + raise ValueError('expected call to be cancelled') + class _Pipe(object): @@ -249,13 +260,18 @@ class _Pipe(object): self._open = False self._condition.notify() + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + def _ping_pong(stub): request_response_sizes = (31415, 9, 2653, 58979) request_payload_sizes = (27182, 8, 1828, 45904) - with stub: - pipe = _Pipe() + with stub, _Pipe() as pipe: response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) print 'Starting ping-pong with response iterator %s' % response_iterator for response_size, payload_size in zip( @@ -273,7 +289,33 @@ def _ping_pong(stub): if len(response.payload.body) != response_size: raise ValueError( 'response body of invalid size %d!' % len(response.payload.body)) - pipe.close() + + +def _cancel_after_first_response(stub): + request_response_sizes = (31415, 9, 2653, 58979) + request_payload_sizes = (27182, 8, 1828, 45904) + with stub, _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) + + response_size = request_response_sizes[0] + payload_size = request_payload_sizes[0] + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters( + size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + pipe.add(request) + response = next(response_iterator) + # We test the contents of `response` in the Ping Pong test - don't check + # them here. + response_iterator.cancel() + + try: + next(response_iterator) + except Exception: + pass + else: + raise ValueError('expected call to be cancelled') def _compute_engine_creds(stub, args): @@ -305,6 +347,8 @@ class TestCase(enum.Enum): SERVER_STREAMING = 'server_streaming' CLIENT_STREAMING = 'client_streaming' PING_PONG = 'ping_pong' + CANCEL_AFTER_BEGIN = 'cancel_after_begin' + CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' SERVICE_ACCOUNT_CREDS = 'service_account_creds' @@ -319,6 +363,10 @@ class TestCase(enum.Enum): _client_streaming(stub) elif self is TestCase.PING_PONG: _ping_pong(stub) + elif self is TestCase.CANCEL_AFTER_BEGIN: + _cancel_after_begin(stub) + elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: + _cancel_after_first_response(stub) elif self is TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds(stub, args) elif self is TestCase.SERVICE_ACCOUNT_CREDS: diff --git a/src/python/src/grpc/_adapter/_blocking_invocation_inline_service_test.py b/src/python/src/grpc/_adapter/_blocking_invocation_inline_service_test.py index 3cd51928d3..7a8ff0ad89 100644 --- a/src/python/src/grpc/_adapter/_blocking_invocation_inline_service_test.py +++ b/src/python/src/grpc/_adapter/_blocking_invocation_inline_service_test.py @@ -43,4 +43,4 @@ class BlockingInvocationInlineServiceTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/_adapter/_c_test.py b/src/python/src/grpc/_adapter/_c_test.py index 6e15adbda8..b06215f0e5 100644 --- a/src/python/src/grpc/_adapter/_c_test.py +++ b/src/python/src/grpc/_adapter/_c_test.py @@ -216,4 +216,4 @@ class _CTest(unittest.TestCase): if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/_adapter/_event_invocation_synchronous_event_service_test.py b/src/python/src/grpc/_adapter/_event_invocation_synchronous_event_service_test.py index b9a13ce69f..b8ceb75d68 100644 --- a/src/python/src/grpc/_adapter/_event_invocation_synchronous_event_service_test.py +++ b/src/python/src/grpc/_adapter/_event_invocation_synchronous_event_service_test.py @@ -43,4 +43,4 @@ class EventInvocationSynchronousEventServiceTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/_adapter/_future_invocation_asynchronous_event_service_test.py b/src/python/src/grpc/_adapter/_future_invocation_asynchronous_event_service_test.py index 7d6a4ffc17..3773e65575 100644 --- a/src/python/src/grpc/_adapter/_future_invocation_asynchronous_event_service_test.py +++ b/src/python/src/grpc/_adapter/_future_invocation_asynchronous_event_service_test.py @@ -43,4 +43,4 @@ class FutureInvocationAsynchronousEventServiceTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py index 4fd76f60f8..50257d8691 100644 --- a/src/python/src/grpc/_adapter/_links_test.py +++ b/src/python/src/grpc/_adapter/_links_test.py @@ -274,4 +274,4 @@ class RoundTripTest(unittest.TestCase): if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py index bdb1ee2379..7f5021f40e 100644 --- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py +++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py @@ -97,4 +97,4 @@ class LonelyRearLinkTest(unittest.TestCase): if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index 09c4660a2b..d4b628c2ae 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -412,4 +412,4 @@ class ExpirationTest(unittest.TestCase): if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/early_adopter/implementations_test.py b/src/python/src/grpc/early_adopter/implementations_test.py index 32b974724c..49f0e949c4 100644 --- a/src/python/src/grpc/early_adopter/implementations_test.py +++ b/src/python/src/grpc/early_adopter/implementations_test.py @@ -177,4 +177,4 @@ class EarlyAdopterImplementationsTest(unittest.TestCase): if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/framework/base/implementations_test.py b/src/python/src/grpc/framework/base/implementations_test.py index d40bb4d92e..72087f4456 100644 --- a/src/python/src/grpc/framework/base/implementations_test.py +++ b/src/python/src/grpc/framework/base/implementations_test.py @@ -77,4 +77,4 @@ class ImplementationsTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/framework/face/blocking_invocation_inline_service_test.py b/src/python/src/grpc/framework/face/blocking_invocation_inline_service_test.py index 636cd701ff..763f0f0edc 100644 --- a/src/python/src/grpc/framework/face/blocking_invocation_inline_service_test.py +++ b/src/python/src/grpc/framework/face/blocking_invocation_inline_service_test.py @@ -43,4 +43,4 @@ class BlockingInvocationInlineServiceTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/framework/face/event_invocation_synchronous_event_service_test.py b/src/python/src/grpc/framework/face/event_invocation_synchronous_event_service_test.py index 25f3e297b5..e1ab3cf711 100644 --- a/src/python/src/grpc/framework/face/event_invocation_synchronous_event_service_test.py +++ b/src/python/src/grpc/framework/face/event_invocation_synchronous_event_service_test.py @@ -43,4 +43,4 @@ class EventInvocationSynchronousEventServiceTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/framework/face/future_invocation_asynchronous_event_service_test.py b/src/python/src/grpc/framework/face/future_invocation_asynchronous_event_service_test.py index 38229ea9f4..2d13bb911d 100644 --- a/src/python/src/grpc/framework/face/future_invocation_asynchronous_event_service_test.py +++ b/src/python/src/grpc/framework/face/future_invocation_asynchronous_event_service_test.py @@ -43,4 +43,4 @@ class FutureInvocationAsynchronousEventServiceTest( if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/framework/foundation/_later_test.py b/src/python/src/grpc/framework/foundation/_later_test.py index e83e703128..6c2459e185 100644 --- a/src/python/src/grpc/framework/foundation/_later_test.py +++ b/src/python/src/grpc/framework/foundation/_later_test.py @@ -148,4 +148,4 @@ class LaterTest(unittest.TestCase): self.assertEqual(return_value, future_passed_to_callback_cell[0].result()) if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/python/src/grpc/framework/foundation/_logging_pool_test.py b/src/python/src/grpc/framework/foundation/_logging_pool_test.py index c92cf8c0ab..452802da6a 100644 --- a/src/python/src/grpc/framework/foundation/_logging_pool_test.py +++ b/src/python/src/grpc/framework/foundation/_logging_pool_test.py @@ -61,4 +61,4 @@ class LoggingPoolTest(unittest.TestCase): if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2) diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb index 78cb8dd836..2ba8d2c19e 100755 --- a/src/ruby/bin/interop/interop_server.rb +++ b/src/ruby/bin/interop/interop_server.rb @@ -128,16 +128,19 @@ class TestTarget < Grpc::Testing::TestService::Service cls = StreamingOutputCallResponse Thread.new do begin + GRPC.logger.info('interop-server: started receiving') reqs.each do |req| - GRPC.logger.info("read #{req.inspect}") resp_size = req.response_parameters[0].size + GRPC.logger.info("read a req, response size is #{resp_size}") resp = cls.new(payload: Payload.new(type: req.response_type, body: nulls(resp_size))) q.push(resp) end - GRPC.logger.info('finished reads') + GRPC.logger.info('interop-server: finished receiving') q.push(self) rescue StandardError => e + GRPC.logger.info('interop-server: failed') + GRPC.logger.warn(e) q.push(e) # share the exception with the enumerator end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 5f7beb5ab1..04abab8ac3 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -39,6 +39,7 @@ class Struct return nil if status.nil? fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED if status.code != GRPC::Core::StatusCodes::OK + GRPC.logger.debug("Failing with status #{status}") # raise BadStatus, propagating the metadata if present. md = status.metadata with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 67143d40cf..f1b9f6b00d 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -100,6 +100,7 @@ module GRPC replys = gen_each_reply.call(each_queued_msg) @enq_th = start_write_loop(replys, is_client: false) @loop_th = start_read_loop + @enq_th.join if @enq_th.alive? end private @@ -115,7 +116,7 @@ module GRPC return enum_for(:each_queued_msg) unless block_given? count = 0 loop do - GRPC.logger.debug("each_queued_msg: msg##{count}") + GRPC.logger.debug("each_queued_msg: waiting##{count}") count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") @@ -123,70 +124,73 @@ module GRPC break if req.equal?(END_OF_READS) yield req end - @enq_th.join if @enq_th.alive? end # during bidi-streaming, read the requests to send from a separate thread # read so that read_loop does not block waiting for requests to read. def start_write_loop(requests, is_client: true) Thread.new do # TODO: run on a thread pool - write_tag = Object.new + GRPC.logger.debug('bidi-write-loop: starting') begin + write_tag = Object.new count = 0 requests.each do |req| - GRPC.logger.debug("bidi-write_loop: #{count}") + GRPC.logger.debug("bidi-write-loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end + GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client - GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting") + GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil) batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, RECV_STATUS_ON_CLIENT => nil) batch_result.check_status end rescue StandardError => e - GRPC.logger.warn('bidi-write_loop: failed') + GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) raise e end + GRPC.logger.debug('bidi-write-loop: finished') end end # starts the read loop def start_read_loop Thread.new do + GRPC.logger.debug('bidi-read-loop: starting') begin read_tag = Object.new count = 0 - # queue the initial read before beginning the loop loop do - GRPC.logger.debug("bidi-read_loop: #{count}") + GRPC.logger.debug("bidi-read-loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_MESSAGE => nil) # handle the next message if batch_result.message.nil? + GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break end # push the latest read onto the queue and continue reading - GRPC.logger.debug("received req: #{batch_result.message}") res = @unmarshal.call(batch_result.message) @readq.push(res) end - rescue StandardError => e - GRPC.logger.warn('bidi: read_loop failed') + GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) @readq.push(e) # let each_queued_msg terminate with this error end + GRPC.logger.debug('bidi-read-loop: finished') end end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 2fd61c5f7e..dd90d8d91d 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -137,6 +137,7 @@ module GRPC def send_status(active_client, code, details, **kw) details = 'Not sure why' if details.nil? + GRPC.logger.debug("Sending status #{code}:#{details}") active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e GRPC.logger.warn("Could not send status #{code}:#{details}") |