diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-04-06 20:27:00 +0200 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-04-06 20:27:00 +0200 |
commit | d81684f19148665116f75d8decf9b74b0a9ce423 (patch) | |
tree | 6dd5e05cd83c42dbe4cc72bdf15c1c03e19fc6b9 /src | |
parent | ff2828be3dcb22f09d05117eaa1dddea17703ecf (diff) | |
parent | fdd65f325d19ae2ffd6c95f7a7a26bee84552fce (diff) |
Merge branch 'master' of github.com:google/grpc into the-purge
Diffstat (limited to 'src')
45 files changed, 703 insertions, 284 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index d32213f7d5..72149bc4e3 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -354,6 +354,7 @@ bool PrintStubFactory(const grpc::string& package_qualified_service_name, "Service", service->name(), }); out->Print(dict, "def early_adopter_create_$Service$_stub(host, port," + " metadata_transformer=None," " secure=False, root_certificates=None, private_key=None," " certificate_chain=None, server_host_override=None):\n"); { @@ -423,7 +424,8 @@ bool PrintStubFactory(const grpc::string& package_qualified_service_name, out->Print( "return implementations.stub(" "\"$PackageQualifiedServiceName$\"," - " method_invocation_descriptions, host, port, secure=secure," + " method_invocation_descriptions, host, port," + " metadata_transformer=metadata_transformer, secure=secure," " root_certificates=root_certificates, private_key=private_key," " certificate_chain=certificate_chain," " server_host_override=server_host_override)\n", diff --git a/src/core/support/thd.c b/src/core/support/thd.c new file mode 100644 index 0000000000..ec308f3119 --- /dev/null +++ b/src/core/support/thd.c @@ -0,0 +1,66 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Posix implementation for gpr threads. */ + +#include <memory.h> + +#include <grpc/support/thd.h> + +enum { + GPR_THD_JOINABLE = 1 +}; + +gpr_thd_options gpr_thd_options_default(void) { + gpr_thd_options options; + memset(&options, 0, sizeof(options)); + return options; +} + +void gpr_thd_options_set_detached(gpr_thd_options *options) { + options->flags &= ~GPR_THD_JOINABLE; +} + +void gpr_thd_options_set_joinable(gpr_thd_options *options) { + options->flags |= GPR_THD_JOINABLE; +} + +int gpr_thd_options_is_detached(const gpr_thd_options *options) { + if (!options) return 1; + return (options->flags & GPR_THD_JOINABLE) == 0; +} + +int gpr_thd_options_is_joinable(const gpr_thd_options *options) { + if (!options) return 0; + return (options->flags & GPR_THD_JOINABLE) == GPR_THD_JOINABLE; +} diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c index f50ea58335..7bf527201d 100644 --- a/src/core/support/thd_posix.c +++ b/src/core/support/thd_posix.c @@ -68,7 +68,11 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, a->arg = arg; GPR_ASSERT(pthread_attr_init(&attr) == 0); - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0); + if (gpr_thd_options_is_detached(options)) { + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0); + } else { + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0); + } thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0); GPR_ASSERT(pthread_attr_destroy(&attr) == 0); if (!thread_started) { @@ -78,14 +82,12 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, return thread_started; } -gpr_thd_options gpr_thd_options_default(void) { - gpr_thd_options options; - memset(&options, 0, sizeof(options)); - return options; -} - gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); } +void gpr_thd_join(gpr_thd_id t) { + pthread_join(t, NULL); +} + #endif /* GPR_POSIX_SYNC */ diff --git a/src/core/support/thd_win32.c b/src/core/support/thd_win32.c index 347cad57e3..f92fb64a5c 100644 --- a/src/core/support/thd_win32.c +++ b/src/core/support/thd_win32.c @@ -31,7 +31,7 @@ * */ -/* Posix implementation for gpr threads. */ +/* Windows implementation for gpr threads. */ #include <grpc/support/port_platform.h> @@ -40,47 +40,81 @@ #include <windows.h> #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include <grpc/support/thd.h> -struct thd_arg { +#if defined(_MSC_VER) +#define thread_local __declspec(thread) +#elif defined(__GNUC__) +#define thread_local __thread +#else +#error "Unknown compiler - please file a bug report" +#endif + +struct thd_info { void (*body)(void *arg); /* body of a thread */ void *arg; /* argument to a thread */ + HANDLE join_event; /* if joinable, the join event */ + int joinable; /* true if not detached */ }; +static thread_local struct thd_info *g_thd_info; + +/* Destroys a thread info */ +static destroy_thread(struct thd_info *t) { + if (t->joinable) CloseHandle(t->join_event); + gpr_free(t); +} + /* Body of every thread started via gpr_thd_new. */ static DWORD WINAPI thread_body(void *v) { - struct thd_arg a = *(struct thd_arg *)v; - gpr_free(v); - (*a.body)(a.arg); + g_thd_info = (struct thd_info *)v; + g_thd_info->body(g_thd_info->arg); + if (g_thd_info->joinable) { + BOOL ret = SetEvent(g_thd_info->join_event); + GPR_ASSERT(ret); + } else { + destroy_thread(g_thd_info); + } return 0; } int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, const gpr_thd_options *options) { HANDLE handle; - DWORD thread_id; - struct thd_arg *a = gpr_malloc(sizeof(*a)); - a->body = thd_body; - a->arg = arg; + struct thd_info *info = gpr_malloc(sizeof(*info)); + info->body = thd_body; + info->arg = arg; *t = 0; - handle = CreateThread(NULL, 64 * 1024, thread_body, a, 0, &thread_id); + if (gpr_thd_options_is_joinable(options)) { + info->joinable = 1; + info->join_event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (info->join_event == NULL) { + gpr_free(info); + return 0; + } + } else { + info->joinable = 0; + } + handle = CreateThread(NULL, 64 * 1024, thread_body, info, 0, NULL); if (handle == NULL) { - gpr_free(a); + destroy_thread(info); } else { - CloseHandle(handle); /* threads are "detached" */ + *t = (gpr_thd_id)info; + CloseHandle(handle); } - *t = (gpr_thd_id)thread_id; return handle != NULL; } -gpr_thd_options gpr_thd_options_default(void) { - gpr_thd_options options; - memset(&options, 0, sizeof(options)); - return options; +gpr_thd_id gpr_thd_currentid(void) { + return (gpr_thd_id)g_thd_info; } -gpr_thd_id gpr_thd_currentid(void) { - return (gpr_thd_id)GetCurrentThreadId(); +void gpr_thd_join(gpr_thd_id t) { + struct thd_info *info = (struct thd_info *)t; + DWORD ret = WaitForSingleObject(info->join_event, INFINITE); + GPR_ASSERT(ret == WAIT_OBJECT_0); + destroy_thread(info); } #endif /* GPR_WIN32 */ diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 1c15716fad..066cc263a1 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -97,7 +97,7 @@ static void internal_string_ref(internal_string *s); static void internal_string_unref(internal_string *s); static void discard_metadata(grpc_mdctx *ctx); static void gc_mdtab(grpc_mdctx *ctx); -static void metadata_context_destroy(grpc_mdctx *ctx); +static void metadata_context_destroy_locked(grpc_mdctx *ctx); static void lock(grpc_mdctx *ctx) { gpr_mu_lock(&ctx->mu); } @@ -122,8 +122,7 @@ static void unlock(grpc_mdctx *ctx) { discard_metadata(ctx); } if (ctx->strtab_count == 0) { - gpr_mu_unlock(&ctx->mu); - metadata_context_destroy(ctx); + metadata_context_destroy_locked(ctx); return; } } @@ -185,8 +184,7 @@ static void discard_metadata(grpc_mdctx *ctx) { } } -static void metadata_context_destroy(grpc_mdctx *ctx) { - gpr_mu_lock(&ctx->mu); +static void metadata_context_destroy_locked(grpc_mdctx *ctx) { GPR_ASSERT(ctx->strtab_count == 0); GPR_ASSERT(ctx->mdtab_count == 0); GPR_ASSERT(ctx->mdtab_free == 0); diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs index f5956bd33e..ca7683d399 100644 --- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs +++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs @@ -46,11 +46,15 @@ namespace math MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel); MathExamples.DivExample(stub); - MathExamples.FibExample(stub); + MathExamples.DivAsyncExample(stub).Wait(); - MathExamples.SumExample(stub); + MathExamples.FibExample(stub).Wait(); - MathExamples.DivManyExample(stub); + MathExamples.SumExample(stub).Wait(); + + MathExamples.DivManyExample(stub).Wait(); + + MathExamples.DependendRequestsExample(stub).Wait(); } GrpcEnvironment.Shutdown(); diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index cf5a640079..f9c1caf700 100644 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -37,6 +37,18 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Reactive.Interfaces"> + <HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Core"> + <HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Linq"> + <HintPath>..\packages\Rx-Linq.2.2.5\lib\net45\System.Reactive.Linq.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.PlatformServices"> + <HintPath>..\packages\Rx-PlatformServices.2.2.5\lib\net45\System.Reactive.PlatformServices.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index 85f213cb39..fa5d6688a6 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using Grpc.Core; @@ -120,14 +121,12 @@ namespace math.Tests [Test] public void Sum() { - var res = client.Sum(); - foreach (var num in new long[] { 10, 20, 30 }) - { - res.Inputs.OnNext(Num.CreateBuilder().SetNum_(num).Build()); - } - res.Inputs.OnCompleted(); + var clientStreamingResult = client.Sum(); + var numList = new List<long> { 10, 20, 30 }.ConvertAll( + n => Num.CreateBuilder().SetNum_(n).Build()); + numList.Subscribe(clientStreamingResult.Inputs); - Assert.AreEqual(60, res.Task.Result.Num_); + Assert.AreEqual(60, clientStreamingResult.Task.Result.Num_); } [Test] @@ -142,13 +141,7 @@ namespace math.Tests var recorder = new RecordingObserver<DivReply>(); var requestObserver = client.DivMany(recorder); - - foreach (var arg in divArgsList) - { - requestObserver.OnNext(arg); - } - requestObserver.OnCompleted(); - + divArgsList.Subscribe(requestObserver); var result = recorder.ToList().Result; CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient)); diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config index 51c17bcd5e..06c5e6a4eb 100644 --- a/src/csharp/Grpc.Examples.Tests/packages.config +++ b/src/csharp/Grpc.Examples.Tests/packages.config @@ -1,5 +1,10 @@ -<?xml version="1.0" encoding="utf-8"?> -<packages> - <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> - <package id="NUnit" version="2.6.4" targetFramework="net45" /> +<?xml version="1.0" encoding="utf-8"?>
+<packages>
+ <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="NUnit" version="2.6.4" targetFramework="net45" />
+ <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Linq" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Main" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-PlatformServices" version="2.2.5" targetFramework="net45" />
</packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs index b8bb7eacbd..032372b2a1 100644 --- a/src/csharp/Grpc.Examples/MathExamples.cs +++ b/src/csharp/Grpc.Examples/MathExamples.cs @@ -45,51 +45,45 @@ namespace math Console.WriteLine("Div Result: " + result); } - public static void DivAsyncExample(MathGrpc.IMathServiceClient stub) + public static async Task DivAsyncExample(MathGrpc.IMathServiceClient stub) { - Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = call.Result; - Console.WriteLine(result); + Task<DivReply> resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); + DivReply result = await resultTask; + Console.WriteLine("DivAsync Result: " + result); } - public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub) + public static async Task DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub) { - Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = call.Result; + Task<DivReply> resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); + DivReply result = await resultTask; Console.WriteLine(result); } - public static void FibExample(MathGrpc.IMathServiceClient stub) + public static async Task FibExample(MathGrpc.IMathServiceClient stub) { var recorder = new RecordingObserver<Num>(); stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder); - - List<Num> numbers = recorder.ToList().Result; - Console.WriteLine("Fib Result: " + string.Join("|", recorder.ToList().Result)); + List<Num> result = await recorder.ToList(); + Console.WriteLine("Fib Result: " + string.Join("|", result)); } - public static void SumExample(MathGrpc.IMathServiceClient stub) + public static async Task SumExample(MathGrpc.IMathServiceClient stub) { - List<Num> numbers = new List<Num> + var numbers = new List<Num> { new Num.Builder { Num_ = 1 }.Build(), new Num.Builder { Num_ = 2 }.Build(), new Num.Builder { Num_ = 3 }.Build() }; - var res = stub.Sum(); - foreach (var num in numbers) - { - res.Inputs.OnNext(num); - } - res.Inputs.OnCompleted(); - - Console.WriteLine("Sum Result: " + res.Task.Result); + var clientStreamingResult = stub.Sum(); + numbers.Subscribe(clientStreamingResult.Inputs); + Console.WriteLine("Sum Result: " + await clientStreamingResult.Task); } - public static void DivManyExample(MathGrpc.IMathServiceClient stub) + public static async Task DivManyExample(MathGrpc.IMathServiceClient stub) { - List<DivArgs> divArgsList = new List<DivArgs> + var divArgsList = new List<DivArgs> { new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(), new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(), @@ -97,26 +91,27 @@ namespace math }; var recorder = new RecordingObserver<DivReply>(); - var inputs = stub.DivMany(recorder); - foreach (var input in divArgsList) - { - inputs.OnNext(input); - } - inputs.OnCompleted(); - - Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result)); + divArgsList.Subscribe(inputs); + var result = await recorder.ToList(); + Console.WriteLine("DivMany Result: " + string.Join("|", result)); } - public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub) + public static async Task DependendRequestsExample(MathGrpc.IMathServiceClient stub) { - var numberList = new List<Num> + var numbers = new List<Num> { - new Num.Builder { Num_ = 1 }.Build(), - new Num.Builder { Num_ = 2 }.Build(), new Num.Builder { Num_ = 3 }.Build() + new Num.Builder { Num_ = 1 }.Build(), + new Num.Builder { Num_ = 2 }.Build(), + new Num.Builder { Num_ = 3 }.Build() }; - numberList.ToObservable(); + var clientStreamingResult = stub.Sum(); + numbers.Subscribe(clientStreamingResult.Inputs); + Num sum = await clientStreamingResult.Task; + + DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build()); + Console.WriteLine("Avg Result: " + result); } } } diff --git a/src/node/binding.gyp b/src/node/binding.gyp index 7ef3bdf4bd..83f72fabca 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -18,12 +18,29 @@ ], 'link_settings': { 'libraries': [ - '-lrt', '-lpthread', '-lgrpc', '-lgpr' - ], + ] }, + "conditions": [ + ['OS == "mac"', { + 'xcode_settings': { + 'MACOSX_DEPLOYMENT_TARGET': '10.9', + 'OTHER_CFLAGS': [ + '-std=c++11', + '-stdlib=libc++' + ] + } + }], + ['OS != "mac"', { + 'link_settings': { + 'libraries': [ + '-lrt' + ] + } + }] + ], "target_name": "grpc", "sources": [ "ext/byte_buffer.cc", diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc index 82b54b518c..01bd92ea52 100644 --- a/src/node/ext/byte_buffer.cc +++ b/src/node/ext/byte_buffer.cc @@ -32,7 +32,6 @@ */ #include <string.h> -#include <malloc.h> #include <node.h> #include <nan.h> diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index 787e274973..d37bf763dd 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -31,8 +31,6 @@ * */ -#include <malloc.h> - #include <vector> #include <node.h> diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index e47bac833b..3c2396b810 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -38,8 +38,6 @@ #include <node.h> #include <nan.h> -#include <malloc.h> - #include <vector> #include "grpc/grpc.h" #include "grpc/grpc_security.h" diff --git a/src/node/src/client.js b/src/node/src/client.js index c46f7d0526..fad369c2f8 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -241,13 +241,13 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { callback(err); return; } + emitter.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); error.code = response.status.code; callback(error); return; } - emitter.emit('status', response.status); emitter.emit('metadata', response.metadata); callback(null, deserialize(response.read)); }); @@ -312,13 +312,13 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(err); return; } + stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); error.code = response.status.code; callback(error); return; } - stream.emit('status', response.status); callback(null, deserialize(response.read)); }); }); diff --git a/src/node/src/server.js b/src/node/src/server.js index 8a26a43606..05de16294d 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -70,6 +70,9 @@ function handleError(call, error) { status.details = error.details; } } + if (error.hasOwnProperty('metadata')) { + status.metadata = error.metadata; + } var error_batch = {}; error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(error_batch, function(){}); @@ -102,15 +105,20 @@ function waitForCancel(call, emitter) { * @param {*} value The value to respond with * @param {function(*):Buffer=} serialize Serialization function for the * response + * @param {Object=} metadata Optional trailing metadata to send with status */ -function sendUnaryResponse(call, value, serialize) { +function sendUnaryResponse(call, value, serialize, metadata) { var end_batch = {}; - end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); - end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + var status = { code: grpc.status.OK, details: 'OK', metadata: {} }; + if (metadata) { + status.metadata = metadata; + } + end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); + end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(end_batch, function (){}); } @@ -143,6 +151,7 @@ function setUpWritable(stream, serialize) { function setStatus(err) { var code = grpc.status.INTERNAL; var details = 'Unknown Error'; + var metadata = {}; if (err.hasOwnProperty('message')) { details = err.message; } @@ -152,7 +161,10 @@ function setUpWritable(stream, serialize) { details = err.details; } } - stream.status = {code: code, details: details, metadata: {}}; + if (err.hasOwnProperty('metadata')) { + metadata = err.metadata; + } + stream.status = {code: code, details: details, metadata: metadata}; } /** * Terminate the call. This includes indicating that reads are done, draining @@ -166,6 +178,17 @@ function setUpWritable(stream, serialize) { stream.end(); } stream.on('error', terminateCall); + /** + * Override of Writable#end method that allows for sending metadata with a + * success status. + * @param {Object=} metadata Metadata to send with the status + */ + stream.end = function(metadata) { + if (metadata) { + stream.status.metadata = metadata; + } + Writable.prototype.end.call(this); + }; } /** @@ -335,11 +358,13 @@ function handleUnary(call, handler, metadata) { if (emitter.cancelled) { return; } - handler.func(emitter, function sendUnaryData(err, value) { + handler.func(emitter, function sendUnaryData(err, value, trailer) { if (err) { + err.metadata = trailer; handleError(call, err); + } else { + sendUnaryResponse(call, value, handler.serialize, trailer); } - sendUnaryResponse(call, value, handler.serialize); }); }); } @@ -378,12 +403,14 @@ function handleClientStreaming(call, handler, metadata) { var metadata_batch = {}; metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; call.startBatch(metadata_batch, function() {}); - handler.func(stream, function(err, value) { + handler.func(stream, function(err, value, trailer) { stream.terminate(); if (err) { + err.metadata = trailer; handleError(call, err); + } else { + sendUnaryResponse(call, value, handler.serialize, trailer); } - sendUnaryResponse(call, value, handler.serialize); }); } diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 96b47815e1..590c644c71 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -126,6 +126,167 @@ describe('Generic client and server', function() { }); }); }); +describe('Trailing metadata', function() { + var client; + var server; + before(function() { + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_service = test_proto.lookup('TestService'); + var Server = grpc.buildServer([test_service]); + server = new Server({ + TestService: { + unary: function(call, cb) { + var req = call.request; + if (req.error) { + cb(new Error('Requested error'), null, {metadata: ['yes']}); + } else { + cb(null, {count: 1}, {metadata: ['yes']}); + } + }, + clientStream: function(stream, cb){ + var count = 0; + var errored; + stream.on('data', function(data) { + if (data.error) { + errored = true; + cb(new Error('Requested error'), null, {metadata: ['yes']}); + } else { + count += 1; + } + }); + stream.on('end', function() { + if (!errored) { + cb(null, {count: count}, {metadata: ['yes']}); + } + }); + }, + serverStream: function(stream) { + var req = stream.request; + if (req.error) { + var err = new Error('Requested error'); + err.metadata = {metadata: ['yes']}; + stream.emit('error', err); + } else { + for (var i = 0; i < 5; i++) { + stream.write({count: i}); + } + stream.end({metadata: ['yes']}); + } + }, + bidiStream: function(stream) { + var count = 0; + stream.on('data', function(data) { + if (data.error) { + var err = new Error('Requested error'); + err.metadata = { + metadata: ['yes'], + count: ['' + count] + }; + stream.emit('error', err); + } else { + stream.write({count: count}); + count += 1; + } + }); + stream.on('end', function() { + stream.end({metadata: ['yes']}); + }); + } + } + }); + var port = server.bind('localhost:0'); + var Client = surface_client.makeProtobufClientConstructor(test_service); + client = new Client('localhost:' + port); + server.listen(); + }); + after(function() { + server.shutdown(); + }); + it('should be present when a unary call succeeds', function(done) { + var call = client.unary({error: false}, function(err, data) { + assert.ifError(err); + }); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a unary call fails', function(done) { + var call = client.unary({error: true}, function(err, data) { + assert(err); + }); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a client stream call succeeds', function(done) { + var call = client.clientStream(function(err, data) { + assert.ifError(err); + }); + call.write({error: false}); + call.write({error: false}); + call.end(); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a client stream call fails', function(done) { + var call = client.clientStream(function(err, data) { + assert(err); + }); + call.write({error: false}); + call.write({error: true}); + call.end(); + call.on('status', function(status) { + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a server stream call succeeds', function(done) { + var call = client.serverStream({error: false}); + call.on('data', function(){}); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a server stream call fails', function(done) { + var call = client.serverStream({error: true}); + call.on('data', function(){}); + call.on('status', function(status) { + assert.notStrictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a bidi stream succeeds', function(done) { + var call = client.bidiStream(); + call.write({error: false}); + call.write({error: false}); + call.end(); + call.on('data', function(){}); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); + it('should be present when a bidi stream fails', function(done) { + var call = client.bidiStream(); + call.write({error: false}); + call.write({error: true}); + call.end(); + call.on('data', function(){}); + call.on('status', function(status) { + assert.notStrictEqual(status.code, grpc.status.OK); + assert.deepEqual(status.metadata.metadata, ['yes']); + done(); + }); + }); +}); describe('Cancelling surface client', function() { var client; var server; diff --git a/src/node/test/test_service.proto b/src/node/test/test_service.proto new file mode 100644 index 0000000000..5d3d891841 --- /dev/null +++ b/src/node/test/test_service.proto @@ -0,0 +1,52 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto2"; + +message Request { + optional bool error = 1; +} + +message Response { + optional int32 count = 1; +} + +service TestService { + rpc Unary (Request) returns (Response) { + } + + rpc ClientStream (stream Request) returns (Response) { + } + + rpc ServerStream (Request) returns (stream Response) { + } + + rpc BidiStream (stream Request) returns (stream Response) { + } +}
\ No newline at end of file diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 6e83e79a4f..6bc65b5367 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -263,7 +263,7 @@ PHP_METHOD(Call, __construct) { * @param array batch Array of actions to take * @return object Object with results of all actions */ -PHP_METHOD(Call, start_batch) { +PHP_METHOD(Call, startBatch) { wrapped_grpc_call *call = (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); grpc_op ops[8]; @@ -494,7 +494,7 @@ PHP_METHOD(Call, cancel) { static zend_function_entry call_methods[] = { PHP_ME(Call, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) - PHP_ME(Call, start_batch, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Call, startBatch, NULL, ZEND_ACC_PUBLIC) PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC) PHP_FE_END}; void grpc_init_call(TSRMLS_D) { diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c index 46fe745c5a..dbb9425619 100644 --- a/src/php/ext/grpc/server.c +++ b/src/php/ext/grpc/server.c @@ -133,7 +133,7 @@ PHP_METHOD(Server, __construct) { * @param long $tag_cancel The tag to use if the call is cancelled * @return Void */ -PHP_METHOD(Server, request_call) { +PHP_METHOD(Server, requestCall) { grpc_call_error error_code; wrapped_grpc_server *server = (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); @@ -178,7 +178,7 @@ cleanup: * @param string $addr The address to add * @return true on success, false on failure */ -PHP_METHOD(Server, add_http2_port) { +PHP_METHOD(Server, addHttp2Port) { wrapped_grpc_server *server = (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); const char *addr; @@ -193,7 +193,7 @@ PHP_METHOD(Server, add_http2_port) { RETURN_LONG(grpc_server_add_http2_port(server->wrapped, addr)); } -PHP_METHOD(Server, add_secure_http2_port) { +PHP_METHOD(Server, addSecureHttp2Port) { wrapped_grpc_server *server = (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); const char *addr; @@ -227,9 +227,9 @@ PHP_METHOD(Server, start) { static zend_function_entry server_methods[] = { PHP_ME(Server, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) - PHP_ME(Server, request_call, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Server, add_http2_port, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Server, add_secure_http2_port, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Server, requestCall, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Server, addHttp2Port, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Server, addSecureHttp2Port, NULL, ZEND_ACC_PUBLIC) PHP_ME(Server, start, NULL, ZEND_ACC_PUBLIC) PHP_FE_END}; void grpc_init_server(TSRMLS_D) { diff --git a/src/php/ext/grpc/timeval.c b/src/php/ext/grpc/timeval.c index 1c9542dbff..8a278d6760 100644 --- a/src/php/ext/grpc/timeval.c +++ b/src/php/ext/grpc/timeval.c @@ -227,7 +227,7 @@ PHP_METHOD(Timeval, zero) { * Returns the infinite future time value as a timeval object * @return Timeval Infinite future time value */ -PHP_METHOD(Timeval, inf_future) { +PHP_METHOD(Timeval, infFuture) { zval *grpc_php_timeval_inf_future = grpc_php_wrap_timeval(gpr_inf_future); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_future); } @@ -236,7 +236,7 @@ PHP_METHOD(Timeval, inf_future) { * Returns the infinite past time value as a timeval object * @return Timeval Infinite past time value */ -PHP_METHOD(Timeval, inf_past) { +PHP_METHOD(Timeval, infPast) { zval *grpc_php_timeval_inf_past = grpc_php_wrap_timeval(gpr_inf_past); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_past); } @@ -245,7 +245,7 @@ PHP_METHOD(Timeval, inf_past) { * Sleep until this time, interpreted as an absolute timeout * @return void */ -PHP_METHOD(Timeval, sleep_until) { +PHP_METHOD(Timeval, sleepUntil) { wrapped_grpc_timeval *this = (wrapped_grpc_timeval *)zend_object_store_get_object(getThis() TSRMLS_CC); gpr_sleep_until(this->wrapped); @@ -255,11 +255,11 @@ static zend_function_entry timeval_methods[] = { PHP_ME(Timeval, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) PHP_ME(Timeval, add, NULL, ZEND_ACC_PUBLIC) PHP_ME(Timeval, compare, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, inf_future, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, inf_past, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Timeval, infFuture, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Timeval, infPast, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(Timeval, now, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(Timeval, similar, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, sleep_until, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Timeval, sleepUntil, NULL, ZEND_ACC_PUBLIC) PHP_ME(Timeval, subtract, NULL, ZEND_ACC_PUBLIC) PHP_ME(Timeval, zero, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END}; diff --git a/src/php/lib/Grpc/AbstractCall.php b/src/php/lib/Grpc/AbstractCall.php index 413d5966e1..1add972589 100644 --- a/src/php/lib/Grpc/AbstractCall.php +++ b/src/php/lib/Grpc/AbstractCall.php @@ -45,7 +45,7 @@ abstract class AbstractCall { * @param string $method The method to call on the remote server */ public function __construct(Channel $channel, $method, $deserialize) { - $this->call = new Call($channel, $method, Timeval::inf_future()); + $this->call = new Call($channel, $method, Timeval::infFuture()); $this->deserialize = $deserialize; $this->metadata = null; } diff --git a/src/php/lib/Grpc/BidiStreamingCall.php b/src/php/lib/Grpc/BidiStreamingCall.php index 2afceafce9..76c642bef4 100644 --- a/src/php/lib/Grpc/BidiStreamingCall.php +++ b/src/php/lib/Grpc/BidiStreamingCall.php @@ -43,7 +43,7 @@ class BidiStreamingCall extends AbstractCall { * @param array $metadata Metadata to send with the call, if applicable */ public function start($metadata) { - $this->call->start_batch([OP_SEND_INITIAL_METADATA => $metadata]); + $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]); } /** @@ -55,7 +55,7 @@ class BidiStreamingCall extends AbstractCall { if ($this->metadata === null) { $batch[OP_RECV_INITIAL_METADATA] = true; } - $read_event = $this->call->start_batch($batch); + $read_event = $this->call->startBatch($batch); if ($this->metadata === null) { $this->metadata = $read_event->metadata; } @@ -68,14 +68,14 @@ class BidiStreamingCall extends AbstractCall { * @param ByteBuffer $data The data to write */ public function write($data) { - $this->call->start_batch([OP_SEND_MESSAGE => $data->serialize()]); + $this->call->startBatch([OP_SEND_MESSAGE => $data->serialize()]); } /** * Indicate that no more writes will be sent. */ public function writesDone() { - $this->call->start_batch([OP_SEND_CLOSE_FROM_CLIENT => true]); + $this->call->startBatch([OP_SEND_CLOSE_FROM_CLIENT => true]); } /** @@ -84,7 +84,7 @@ class BidiStreamingCall extends AbstractCall { * and array $metadata members */ public function getStatus() { - $status_event = $this->call->start_batch([ + $status_event = $this->call->startBatch([ OP_RECV_STATUS_ON_CLIENT => true ]); return $status_event->status; diff --git a/src/php/lib/Grpc/ClientStreamingCall.php b/src/php/lib/Grpc/ClientStreamingCall.php index ec585da985..61439d3f47 100644 --- a/src/php/lib/Grpc/ClientStreamingCall.php +++ b/src/php/lib/Grpc/ClientStreamingCall.php @@ -44,11 +44,11 @@ class ClientStreamingCall extends AbstractCall { * @param array $metadata Metadata to send with the call, if applicable */ public function start($arg_iter, $metadata = array()) { - $event = $this->call->start_batch([OP_SEND_INITIAL_METADATA => $metadata]); + $event = $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]); foreach($arg_iter as $arg) { - $this->call->start_batch([OP_SEND_MESSAGE => $arg->serialize()]); + $this->call->startBatch([OP_SEND_MESSAGE => $arg->serialize()]); } - $this->call->start_batch([OP_SEND_CLOSE_FROM_CLIENT => true]); + $this->call->startBatch([OP_SEND_CLOSE_FROM_CLIENT => true]); } /** @@ -56,7 +56,7 @@ class ClientStreamingCall extends AbstractCall { * @return [response data, status] */ public function wait() { - $event = $this->call->start_batch([ + $event = $this->call->startBatch([ OP_RECV_INITIAL_METADATA => true, OP_RECV_MESSAGE => true, OP_RECV_STATUS_ON_CLIENT => true]); diff --git a/src/php/lib/Grpc/ServerStreamingCall.php b/src/php/lib/Grpc/ServerStreamingCall.php index 574c1bb1e0..631c863345 100644 --- a/src/php/lib/Grpc/ServerStreamingCall.php +++ b/src/php/lib/Grpc/ServerStreamingCall.php @@ -44,7 +44,7 @@ class ServerStreamingCall extends AbstractCall { * @param array $metadata Metadata to send with the call, if applicable */ public function start($arg, $metadata = array()) { - $event = $this->call->start_batch([ + $event = $this->call->startBatch([ OP_SEND_INITIAL_METADATA => $metadata, OP_RECV_INITIAL_METADATA => true, OP_SEND_MESSAGE => $arg->serialize(), @@ -56,10 +56,10 @@ class ServerStreamingCall extends AbstractCall { * @return An iterator of response values */ public function responses() { - $response = $this->call->start_batch([OP_RECV_MESSAGE => true])->message; + $response = $this->call->startBatch([OP_RECV_MESSAGE => true])->message; while($response !== null) { yield $this->deserializeResponse($response); - $response = $this->call->start_batch([OP_RECV_MESSAGE => true])->message; + $response = $this->call->startBatch([OP_RECV_MESSAGE => true])->message; } } @@ -69,7 +69,7 @@ class ServerStreamingCall extends AbstractCall { * and array $metadata members */ public function getStatus() { - $status_event = $this->call->start_batch([ + $status_event = $this->call->startBatch([ OP_RECV_STATUS_ON_CLIENT => true ]); return $status_event->status; diff --git a/src/php/lib/Grpc/UnaryCall.php b/src/php/lib/Grpc/UnaryCall.php index 814d477697..97a10a40f4 100644 --- a/src/php/lib/Grpc/UnaryCall.php +++ b/src/php/lib/Grpc/UnaryCall.php @@ -44,7 +44,7 @@ class UnaryCall extends AbstractCall { * @param array $metadata Metadata to send with the call, if applicable */ public function start($arg, $metadata = array()) { - $event = $this->call->start_batch([ + $event = $this->call->startBatch([ OP_SEND_INITIAL_METADATA => $metadata, OP_RECV_INITIAL_METADATA => true, OP_SEND_MESSAGE => $arg->serialize(), @@ -57,7 +57,7 @@ class UnaryCall extends AbstractCall { * @return [response data, status] */ public function wait() { - $event = $this->call->start_batch([ + $event = $this->call->startBatch([ OP_RECV_MESSAGE => true, OP_RECV_STATUS_ON_CLIENT => true]); return array($this->deserializeResponse($event->message), $event->status); diff --git a/src/php/tests/generated_code/GeneratedCodeTest.php b/src/php/tests/generated_code/GeneratedCodeTest.php index afd7f21de4..927d24ca63 100755 --- a/src/php/tests/generated_code/GeneratedCodeTest.php +++ b/src/php/tests/generated_code/GeneratedCodeTest.php @@ -41,7 +41,8 @@ class GeneratedCodeTest extends PHPUnit_Framework_TestCase { protected static $client; protected static $timeout; public static function setUpBeforeClass() { - self::$client = new math\MathClient(getenv('GRPC_TEST_HOST')); + self::$client = new math\MathClient(new Grpc\BaseStub( + getenv('GRPC_TEST_HOST'), [])); } public function testSimpleRequest() { diff --git a/src/php/tests/unit_tests/CallTest.php b/src/php/tests/unit_tests/CallTest.php index d361ce0030..77a2d86ce4 100755 --- a/src/php/tests/unit_tests/CallTest.php +++ b/src/php/tests/unit_tests/CallTest.php @@ -37,21 +37,21 @@ class CallTest extends PHPUnit_Framework_TestCase{ public static function setUpBeforeClass() { self::$server = new Grpc\Server([]); - self::$port = self::$server->add_http2_port('0.0.0.0:0'); + self::$port = self::$server->addHttp2Port('0.0.0.0:0'); } public function setUp() { $this->channel = new Grpc\Channel('localhost:' . self::$port, []); $this->call = new Grpc\Call($this->channel, '/foo', - Grpc\Timeval::inf_future()); + Grpc\Timeval::infFuture()); } public function testAddEmptyMetadata() { $batch = [ Grpc\OP_SEND_INITIAL_METADATA => [] ]; - $result = $this->call->start_batch($batch); + $result = $this->call->startBatch($batch); $this->assertTrue($result->send_metadata); } @@ -59,7 +59,7 @@ class CallTest extends PHPUnit_Framework_TestCase{ $batch = [ Grpc\OP_SEND_INITIAL_METADATA => ['key' => ['value']] ]; - $result = $this->call->start_batch($batch); + $result = $this->call->startBatch($batch); $this->assertTrue($result->send_metadata); } @@ -67,7 +67,7 @@ class CallTest extends PHPUnit_Framework_TestCase{ $batch = [ Grpc\OP_SEND_INITIAL_METADATA => ['key' => ['value1', 'value2']] ]; - $result = $this->call->start_batch($batch); + $result = $this->call->startBatch($batch); $this->assertTrue($result->send_metadata); } @@ -76,7 +76,7 @@ class CallTest extends PHPUnit_Framework_TestCase{ Grpc\OP_SEND_INITIAL_METADATA => ['key1' => ['value1'], 'key2' => ['value2', 'value3']] ]; - $result = $this->call->start_batch($batch); + $result = $this->call->startBatch($batch); $this->assertTrue($result->send_metadata); } } diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index 3e165b7213..296873fa8f 100755 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -34,7 +34,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ public function setUp() { $this->server = new Grpc\Server([]); - $port = $this->server->add_http2_port('0.0.0.0:0'); + $port = $this->server->addHttp2Port('0.0.0.0:0'); $this->channel = new Grpc\Channel('localhost:' . $port, []); $this->server->start(); } @@ -45,13 +45,13 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ } public function testSimpleRequestBody() { - $deadline = Grpc\Timeval::inf_future(); + $deadline = Grpc\Timeval::infFuture(); $status_text = 'xyz'; $call = new Grpc\Call($this->channel, 'dummy_method', $deadline); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_CLOSE_FROM_CLIENT => true ]); @@ -59,12 +59,12 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertTrue($event->send_metadata); $this->assertTrue($event->send_close); - $event = $this->server->request_call(); + $event = $this->server->requestCall(); $this->assertSame('dummy_method', $event->method); $this->assertSame([], $event->metadata); $server_call = $event->call; - $event = $server_call->start_batch([ + $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_STATUS_FROM_SERVER => [ 'metadata' => [], @@ -78,7 +78,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertTrue($event->send_status); $this->assertFalse($event->cancelled); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_RECV_INITIAL_METADATA => true, Grpc\OP_RECV_STATUS_ON_CLIENT => true ]); @@ -94,7 +94,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ } public function testClientServerFullRequestResponse() { - $deadline = Grpc\Timeval::inf_future(); + $deadline = Grpc\Timeval::infFuture(); $req_text = 'client_server_full_request_response'; $reply_text = 'reply:client_server_full_request_response'; $status_text = 'status:client_server_full_response_text'; @@ -103,7 +103,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ 'dummy_method', $deadline); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_CLOSE_FROM_CLIENT => true, Grpc\OP_SEND_MESSAGE => $req_text @@ -113,11 +113,11 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertTrue($event->send_close); $this->assertTrue($event->send_message); - $event = $this->server->request_call(); + $event = $this->server->requestCall(); $this->assertSame('dummy_method', $event->method); $server_call = $event->call; - $event = $server_call->start_batch([ + $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_MESSAGE => $reply_text, Grpc\OP_SEND_STATUS_FROM_SERVER => [ @@ -135,7 +135,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertFalse($event->cancelled); $this->assertSame($req_text, $event->message); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_RECV_INITIAL_METADATA => true, Grpc\OP_RECV_MESSAGE => true, Grpc\OP_RECV_STATUS_ON_CLIENT => true, diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index 2d62fe9d5e..0c18cd3e91 100755 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -40,8 +40,8 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ file_get_contents(dirname(__FILE__) . '/../data/server1.key'), file_get_contents(dirname(__FILE__) . '/../data/server1.pem')); $this->server = new Grpc\Server(); - $port = $this->server->add_secure_http2_port('0.0.0.0:0', - $server_credentials); + $port = $this->server->addSecureHttp2Port('0.0.0.0:0', + $server_credentials); $this->server->start(); $this->channel = new Grpc\Channel( 'localhost:' . $port, @@ -57,13 +57,13 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ } public function testSimpleRequestBody() { - $deadline = Grpc\Timeval::inf_future(); + $deadline = Grpc\Timeval::infFuture(); $status_text = 'xyz'; $call = new Grpc\Call($this->channel, 'dummy_method', $deadline); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_CLOSE_FROM_CLIENT => true ]); @@ -71,12 +71,12 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertTrue($event->send_metadata); $this->assertTrue($event->send_close); - $event = $this->server->request_call(); + $event = $this->server->requestCall(); $this->assertSame('dummy_method', $event->method); $this->assertSame([], $event->metadata); $server_call = $event->call; - $event = $server_call->start_batch([ + $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_STATUS_FROM_SERVER => [ 'metadata' => [], @@ -90,7 +90,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertTrue($event->send_status); $this->assertFalse($event->cancelled); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_RECV_INITIAL_METADATA => true, Grpc\OP_RECV_STATUS_ON_CLIENT => true ]); @@ -106,7 +106,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ } public function testClientServerFullRequestResponse() { - $deadline = Grpc\Timeval::inf_future(); + $deadline = Grpc\Timeval::infFuture(); $req_text = 'client_server_full_request_response'; $reply_text = 'reply:client_server_full_request_response'; $status_text = 'status:client_server_full_response_text'; @@ -115,7 +115,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ 'dummy_method', $deadline); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_CLOSE_FROM_CLIENT => true, Grpc\OP_SEND_MESSAGE => $req_text @@ -125,11 +125,11 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertTrue($event->send_close); $this->assertTrue($event->send_message); - $event = $this->server->request_call(); + $event = $this->server->requestCall(); $this->assertSame('dummy_method', $event->method); $server_call = $event->call; - $event = $server_call->start_batch([ + $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_MESSAGE => $reply_text, Grpc\OP_SEND_STATUS_FROM_SERVER => [ @@ -147,7 +147,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $this->assertFalse($event->cancelled); $this->assertSame($req_text, $event->message); - $event = $call->start_batch([ + $event = $call->startBatch([ Grpc\OP_RECV_INITIAL_METADATA => true, Grpc\OP_RECV_MESSAGE => true, Grpc\OP_RECV_STATUS_ON_CLIENT => true, diff --git a/src/php/tests/unit_tests/TimevalTest.php b/src/php/tests/unit_tests/TimevalTest.php index d20069afa1..a8bfcf0ac4 100755 --- a/src/php/tests/unit_tests/TimevalTest.php +++ b/src/php/tests/unit_tests/TimevalTest.php @@ -39,14 +39,14 @@ class TimevalTest extends PHPUnit_Framework_TestCase{ public function testPastIsLessThanZero() { $zero = Grpc\Timeval::zero(); - $past = Grpc\Timeval::inf_past(); + $past = Grpc\Timeval::infPast(); $this->assertLessThan(0, Grpc\Timeval::compare($past, $zero)); $this->assertGreaterThan(0, Grpc\Timeval::compare($zero, $past)); } public function testFutureIsGreaterThanZero() { $zero = Grpc\Timeval::zero(); - $future = Grpc\Timeval::inf_future(); + $future = Grpc\Timeval::infFuture(); $this->assertLessThan(0, Grpc\Timeval::compare($zero, $future)); $this->assertGreaterThan(0, Grpc\Timeval::compare($future, $zero)); } @@ -56,7 +56,7 @@ class TimevalTest extends PHPUnit_Framework_TestCase{ */ public function testNowIsBetweenZeroAndFuture() { $zero = Grpc\Timeval::zero(); - $future = Grpc\Timeval::inf_future(); + $future = Grpc\Timeval::infFuture(); $now = Grpc\Timeval::now(); $this->assertLessThan(0, Grpc\Timeval::compare($zero, $now)); $this->assertLessThan(0, Grpc\Timeval::compare($now, $future)); diff --git a/src/python/interop/interop/empty_pb2.py b/src/python/interop/interop/empty_pb2.py index 732a358a36..8c1ce2f13e 100644 --- a/src/python/interop/interop/empty_pb2.py +++ b/src/python/interop/interop/empty_pb2.py @@ -57,6 +57,7 @@ Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), d _sym_db.RegisterMessage(Empty) -from grpc.framework.face import demonstration as _face_testing -from grpc.framework.face import interfaces as _face_interfaces +import abc +from grpc.early_adopter import implementations +from grpc.framework.alpha import utilities # @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/interop/messages_pb2.py b/src/python/interop/interop/messages_pb2.py index d449a99140..0bf3d86a31 100644 --- a/src/python/interop/interop/messages_pb2.py +++ b/src/python/interop/interop/messages_pb2.py @@ -441,6 +441,7 @@ StreamingOutputCallResponse = _reflection.GeneratedProtocolMessageType('Streamin _sym_db.RegisterMessage(StreamingOutputCallResponse) -from grpc.framework.face import demonstration as _face_testing -from grpc.framework.face import interfaces as _face_interfaces +import abc +from grpc.early_adopter import implementations +from grpc.framework.alpha import utilities # @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/interop/test_pb2.py b/src/python/interop/interop/test_pb2.py index e86094611b..71325d5a9f 100644 --- a/src/python/interop/interop/test_pb2.py +++ b/src/python/interop/interop/test_pb2.py @@ -29,121 +29,150 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR) -from grpc.framework.face import demonstration as _face_testing -from grpc.framework.face import interfaces as _face_interfaces -class TestServiceService(object): +import abc +from grpc.early_adopter import implementations +from grpc.framework.alpha import utilities +class EarlyAdopterTestServiceServicer(object): """<fill me in later!>""" - def __init__(self): - pass -class TestServiceServicer(object): - """<fill me in later!>""" - def EmptyCall(self, arg): + __metaclass__ = abc.ABCMeta + @abc.abstractmethod + def EmptyCall(self, request, context): + raise NotImplementedError() + @abc.abstractmethod + def UnaryCall(self, request, context): raise NotImplementedError() - def UnaryCall(self, arg): + @abc.abstractmethod + def StreamingOutputCall(self, request, context): raise NotImplementedError() - def StreamingOutputCall(self, arg): + @abc.abstractmethod + def StreamingInputCall(self, request_iterator, context): raise NotImplementedError() - def StreamingInputCall(self, arg): + @abc.abstractmethod + def FullDuplexCall(self, request_iterator, context): raise NotImplementedError() - def FullDuplexCall(self, arg): + @abc.abstractmethod + def HalfDuplexCall(self, request_iterator, context): + raise NotImplementedError() +class EarlyAdopterTestServiceServer(object): + """<fill me in later!>""" + __metaclass__ = abc.ABCMeta + @abc.abstractmethod + def start(self): raise NotImplementedError() - def HalfDuplexCall(self, arg): + @abc.abstractmethod + def stop(self): raise NotImplementedError() -class TestServiceStub(object): +class EarlyAdopterTestServiceStub(object): """<fill me in later!>""" - def EmptyCall(self, arg): + __metaclass__ = abc.ABCMeta + @abc.abstractmethod + def EmptyCall(self, request): raise NotImplementedError() EmptyCall.async = None - def UnaryCall(self, arg): + @abc.abstractmethod + def UnaryCall(self, request): raise NotImplementedError() UnaryCall.async = None - def StreamingOutputCall(self, arg): + @abc.abstractmethod + def StreamingOutputCall(self, request): raise NotImplementedError() StreamingOutputCall.async = None - def StreamingInputCall(self, arg): + @abc.abstractmethod + def StreamingInputCall(self, request_iterator): raise NotImplementedError() StreamingInputCall.async = None - def FullDuplexCall(self, arg): + @abc.abstractmethod + def FullDuplexCall(self, request_iterator): raise NotImplementedError() FullDuplexCall.async = None - def HalfDuplexCall(self, arg): + @abc.abstractmethod + def HalfDuplexCall(self, request_iterator): raise NotImplementedError() HalfDuplexCall.async = None -class _TestServiceStub(TestServiceStub): - def __init__(self, face_stub, default_timeout): - self._face_stub = face_stub - self._default_timeout = default_timeout - stub_self = self - class EmptyCall(object): - def __call__(self, arg): - return stub_self._face_stub.blocking_value_in_value_out("EmptyCall", arg, stub_self._default_timeout) - def async(self, arg): - return stub_self._face_stub.future_value_in_value_out("EmptyCall", arg, stub_self._default_timeout) - self.EmptyCall = EmptyCall() - class UnaryCall(object): - def __call__(self, arg): - return stub_self._face_stub.blocking_value_in_value_out("UnaryCall", arg, stub_self._default_timeout) - def async(self, arg): - return stub_self._face_stub.future_value_in_value_out("UnaryCall", arg, stub_self._default_timeout) - self.UnaryCall = UnaryCall() - class StreamingOutputCall(object): - def __call__(self, arg): - return stub_self._face_stub.inline_value_in_stream_out("StreamingOutputCall", arg, stub_self._default_timeout) - def async(self, arg): - return stub_self._face_stub.inline_value_in_stream_out("StreamingOutputCall", arg, stub_self._default_timeout) - self.StreamingOutputCall = StreamingOutputCall() - class StreamingInputCall(object): - def __call__(self, arg): - return stub_self._face_stub.blocking_stream_in_value_out("StreamingInputCall", arg, stub_self._default_timeout) - def async(self, arg): - return stub_self._face_stub.future_stream_in_value_out("StreamingInputCall", arg, stub_self._default_timeout) - self.StreamingInputCall = StreamingInputCall() - class FullDuplexCall(object): - def __call__(self, arg): - return stub_self._face_stub.inline_stream_in_stream_out("FullDuplexCall", arg, stub_self._default_timeout) - def async(self, arg): - return stub_self._face_stub.inline_stream_in_stream_out("FullDuplexCall", arg, stub_self._default_timeout) - self.FullDuplexCall = FullDuplexCall() - class HalfDuplexCall(object): - def __call__(self, arg): - return stub_self._face_stub.inline_stream_in_stream_out("HalfDuplexCall", arg, stub_self._default_timeout) - def async(self, arg): - return stub_self._face_stub.inline_stream_in_stream_out("HalfDuplexCall", arg, stub_self._default_timeout) - self.HalfDuplexCall = HalfDuplexCall() -def mock_TestService(servicer, default_timeout): - value_in_value_out = {} - value_in_stream_out = {} - stream_in_value_out = {} - stream_in_stream_out = {} - class EmptyCall(_face_interfaces.InlineValueInValueOutMethod): - def service(self, request, context): - return servicer.EmptyCall(request) - value_in_value_out['EmptyCall'] = EmptyCall() - class UnaryCall(_face_interfaces.InlineValueInValueOutMethod): - def service(self, request, context): - return servicer.UnaryCall(request) - value_in_value_out['UnaryCall'] = UnaryCall() - class StreamingOutputCall(_face_interfaces.InlineValueInStreamOutMethod): - def service(self, request, context): - return servicer.StreamingOutputCall(request) - value_in_stream_out['StreamingOutputCall'] = StreamingOutputCall() - class StreamingInputCall(_face_interfaces.InlineStreamInValueOutMethod): - def service(self, request, context): - return servicer.StreamingInputCall(request) - stream_in_value_out['StreamingInputCall'] = StreamingInputCall() - class FullDuplexCall(_face_interfaces.InlineStreamInStreamOutMethod): - def service(self, request, context): - return servicer.FullDuplexCall(request) - stream_in_stream_out['FullDuplexCall'] = FullDuplexCall() - class HalfDuplexCall(_face_interfaces.InlineStreamInStreamOutMethod): - def service(self, request, context): - return servicer.HalfDuplexCall(request) - stream_in_stream_out['HalfDuplexCall'] = HalfDuplexCall() - face_linked_pair = _face_testing.server_and_stub(default_timeout,inline_value_in_value_out_methods=value_in_value_out,inline_value_in_stream_out_methods=value_in_stream_out,inline_stream_in_value_out_methods=stream_in_value_out,inline_stream_in_stream_out_methods=stream_in_stream_out) - class LinkedPair(object): - def __init__(self, server, stub): - self.server = server - self.stub = stub - stub = _TestServiceStub(face_linked_pair.stub, default_timeout) - return LinkedPair(None, stub) +def early_adopter_create_TestService_server(servicer, port, private_key=None, certificate_chain=None): + import test.cpp.interop.empty_pb2 + import test.cpp.interop.empty_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + method_service_descriptions = { + "EmptyCall": utilities.unary_unary_service_description( + servicer.EmptyCall, + test.cpp.interop.empty_pb2.Empty.FromString, + test.cpp.interop.empty_pb2.Empty.SerializeToString, + ), + "FullDuplexCall": utilities.stream_stream_service_description( + servicer.FullDuplexCall, + test.cpp.interop.messages_pb2.StreamingOutputCallRequest.FromString, + test.cpp.interop.messages_pb2.StreamingOutputCallResponse.SerializeToString, + ), + "HalfDuplexCall": utilities.stream_stream_service_description( + servicer.HalfDuplexCall, + test.cpp.interop.messages_pb2.StreamingOutputCallRequest.FromString, + test.cpp.interop.messages_pb2.StreamingOutputCallResponse.SerializeToString, + ), + "StreamingInputCall": utilities.stream_unary_service_description( + servicer.StreamingInputCall, + test.cpp.interop.messages_pb2.StreamingInputCallRequest.FromString, + test.cpp.interop.messages_pb2.StreamingInputCallResponse.SerializeToString, + ), + "StreamingOutputCall": utilities.unary_stream_service_description( + servicer.StreamingOutputCall, + test.cpp.interop.messages_pb2.StreamingOutputCallRequest.FromString, + test.cpp.interop.messages_pb2.StreamingOutputCallResponse.SerializeToString, + ), + "UnaryCall": utilities.unary_unary_service_description( + servicer.UnaryCall, + test.cpp.interop.messages_pb2.SimpleRequest.FromString, + test.cpp.interop.messages_pb2.SimpleResponse.SerializeToString, + ), + } + return implementations.server("grpc.testing.TestService", method_service_descriptions, port, private_key=private_key, certificate_chain=certificate_chain) +def early_adopter_create_TestService_stub(host, port, metadata_transformer=None, secure=False, root_certificates=None, private_key=None, certificate_chain=None, server_host_override=None): + import test.cpp.interop.empty_pb2 + import test.cpp.interop.empty_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + import test.cpp.interop.messages_pb2 + method_invocation_descriptions = { + "EmptyCall": utilities.unary_unary_invocation_description( + test.cpp.interop.empty_pb2.Empty.SerializeToString, + test.cpp.interop.empty_pb2.Empty.FromString, + ), + "FullDuplexCall": utilities.stream_stream_invocation_description( + test.cpp.interop.messages_pb2.StreamingOutputCallRequest.SerializeToString, + test.cpp.interop.messages_pb2.StreamingOutputCallResponse.FromString, + ), + "HalfDuplexCall": utilities.stream_stream_invocation_description( + test.cpp.interop.messages_pb2.StreamingOutputCallRequest.SerializeToString, + test.cpp.interop.messages_pb2.StreamingOutputCallResponse.FromString, + ), + "StreamingInputCall": utilities.stream_unary_invocation_description( + test.cpp.interop.messages_pb2.StreamingInputCallRequest.SerializeToString, + test.cpp.interop.messages_pb2.StreamingInputCallResponse.FromString, + ), + "StreamingOutputCall": utilities.unary_stream_invocation_description( + test.cpp.interop.messages_pb2.StreamingOutputCallRequest.SerializeToString, + test.cpp.interop.messages_pb2.StreamingOutputCallResponse.FromString, + ), + "UnaryCall": utilities.unary_unary_invocation_description( + test.cpp.interop.messages_pb2.SimpleRequest.SerializeToString, + test.cpp.interop.messages_pb2.SimpleResponse.FromString, + ), + } + return implementations.stub("grpc.testing.TestService", method_invocation_descriptions, host, port, metadata_transformer=metadata_transformer, secure=secure, root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain, server_host_override=server_host_override) # @@protoc_insertion_point(module_scope) diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c index f837267e9a..bf96c1a3fa 100644 --- a/src/python/src/grpc/_adapter/_call.c +++ b/src/python/src/grpc/_adapter/_call.c @@ -164,10 +164,10 @@ static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) { const char* key = NULL; const char* value = NULL; int value_length = 0; + grpc_metadata metadata; if (!PyArg_ParseTuple(args, "ss#", &key, &value, &value_length)) { return NULL; } - grpc_metadata metadata; metadata.key = key; metadata.value = value; metadata.value_length = value_length; diff --git a/src/python/src/grpc/_adapter/_call.h b/src/python/src/grpc/_adapter/_call.h index fabc6f399d..c04a2285f7 100644 --- a/src/python/src/grpc/_adapter/_call.h +++ b/src/python/src/grpc/_adapter/_call.h @@ -38,7 +38,7 @@ #include <grpc/grpc.h> typedef struct { - PyObject_HEAD; + PyObject_HEAD grpc_call *c_call; } Call; diff --git a/src/python/src/grpc/_adapter/_channel.h b/src/python/src/grpc/_adapter/_channel.h index 303b675192..afc0f80359 100644 --- a/src/python/src/grpc/_adapter/_channel.h +++ b/src/python/src/grpc/_adapter/_channel.h @@ -38,7 +38,7 @@ #include <grpc/grpc.h> typedef struct { - PyObject_HEAD; + PyObject_HEAD grpc_channel *c_channel; } Channel; diff --git a/src/python/src/grpc/_adapter/_client_credentials.h b/src/python/src/grpc/_adapter/_client_credentials.h index 47476ce15f..bb9f7f0c3a 100644 --- a/src/python/src/grpc/_adapter/_client_credentials.h +++ b/src/python/src/grpc/_adapter/_client_credentials.h @@ -38,7 +38,7 @@ #include <grpc/grpc_security.h> typedef struct { - PyObject_HEAD; + PyObject_HEAD grpc_credentials *c_client_credentials; } ClientCredentials; diff --git a/src/python/src/grpc/_adapter/_completion_queue.c b/src/python/src/grpc/_adapter/_completion_queue.c index 76d6b6cb44..a639eff53e 100644 --- a/src/python/src/grpc/_adapter/_completion_queue.c +++ b/src/python/src/grpc/_adapter/_completion_queue.c @@ -124,7 +124,7 @@ static PyObject *pygrpc_metadata_collection_get( PyObject *key = PyString_FromString(elem.key); PyObject *value = PyString_FromStringAndSize(elem.value, elem.value_length); PyObject* kvp = PyTuple_Pack(2, key, value); - // n.b. PyList_SetItem *steals* a reference to the set element. + /* n.b. PyList_SetItem *steals* a reference to the set element. */ PyList_SetItem(metadata, i, kvp); Py_DECREF(key); Py_DECREF(value); @@ -266,6 +266,7 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { PyObject *details; PyObject *status; PyObject *event_args; + PyObject *metadata; code = pygrpc_status_code(c_event->data.finished.status); if (code == NULL) { @@ -285,7 +286,7 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { if (status == NULL) { return NULL; } - PyObject* metadata = pygrpc_metadata_collection_get( + metadata = pygrpc_metadata_collection_get( c_event->data.finished.metadata_elements, c_event->data.finished.metadata_count); event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag, diff --git a/src/python/src/grpc/_adapter/_completion_queue.h b/src/python/src/grpc/_adapter/_completion_queue.h index 3a39476a2e..9b377d15d9 100644 --- a/src/python/src/grpc/_adapter/_completion_queue.h +++ b/src/python/src/grpc/_adapter/_completion_queue.h @@ -38,7 +38,7 @@ #include <grpc/grpc.h> typedef struct { - PyObject_HEAD; + PyObject_HEAD grpc_completion_queue *c_completion_queue; } CompletionQueue; diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py index cfdcc2c4bc..4987be389a 100644 --- a/src/python/src/grpc/_adapter/_links_test.py +++ b/src/python/src/grpc/_adapter/_links_test.py @@ -43,6 +43,14 @@ _IDENTITY = lambda x: x _TIMEOUT = 2 +# TODO(nathaniel): End-to-end metadata testing. +def _transform_metadata(unused_metadata): + return ( + ('one unused key', 'one unused value'), + ('another unused key', 'another unused value'), +) + + class RoundTripTest(unittest.TestCase): def setUp(self): @@ -76,7 +84,8 @@ class RoundTripTest(unittest.TestCase): rear_link = rear.RearLink( 'localhost', port, self.rear_link_pool, {test_method: None}, - {test_method: None}, False, None, None, None) + {test_method: None}, False, None, None, None, + metadata_transformer=_transform_metadata) rear_link.join_fore_link(test_fore_link) test_fore_link.join_rear_link(rear_link) rear_link.start() diff --git a/src/python/src/grpc/_adapter/_server.h b/src/python/src/grpc/_adapter/_server.h index 4248712c1c..4836bb638c 100644 --- a/src/python/src/grpc/_adapter/_server.h +++ b/src/python/src/grpc/_adapter/_server.h @@ -38,7 +38,7 @@ #include <grpc/grpc.h> typedef struct { - PyObject_HEAD; + PyObject_HEAD grpc_server *c_server; } Server; diff --git a/src/python/src/grpc/_adapter/_server_credentials.h b/src/python/src/grpc/_adapter/_server_credentials.h index bb6ff2c5bb..6090404bd9 100644 --- a/src/python/src/grpc/_adapter/_server_credentials.h +++ b/src/python/src/grpc/_adapter/_server_credentials.h @@ -38,7 +38,7 @@ #include <grpc/grpc_security.h> typedef struct { - PyObject_HEAD; + PyObject_HEAD grpc_server_credentials *c_server_credentials; } ServerCredentials; diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index f19321c426..2b93aa6331 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -93,7 +93,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated): def __init__( self, host, port, pool, request_serializers, response_deserializers, secure, root_certificates, private_key, certificate_chain, - server_host_override=None): + metadata_transformer=None, server_host_override=None): """Constructor. Args: @@ -111,6 +111,9 @@ class RearLink(base_interfaces.RearLink, activated.Activated): key should be used. certificate_chain: The PEM-encoded certificate chain to use or None if no certificate chain should be used. + metadata_transformer: A function that given a metadata object produces + another metadata to be used in the underlying communication on the + wire. server_host_override: (For testing only) the target name used for SSL host name checking. """ @@ -134,6 +137,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated): self._root_certificates = root_certificates self._private_key = private_key self._certificate_chain = certificate_chain + self._metadata_transformer = metadata_transformer self._server_host_override = server_host_override def _on_write_event(self, operation_id, event, rpc_state): @@ -243,6 +247,10 @@ class RearLink(base_interfaces.RearLink, activated.Activated): """ request_serializer = self._request_serializers[name] call = _low.Call(self._channel, name, self._host, time.time() + timeout) + if self._metadata_transformer is not None: + metadata = self._metadata_transformer([]) + for metadata_key, metadata_value in metadata: + call.add_metadata(metadata_key, metadata_value) call.invoke(self._completion_queue, operation_id, operation_id) outstanding = set(_INVOCATION_EVENT_KINDS) diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index 7d3d29f06c..35456d38c6 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -114,7 +114,7 @@ class _Stub(interfaces.Stub): def __init__( self, breakdown, host, port, secure, root_certificates, private_key, - certificate_chain, server_host_override=None): + certificate_chain, metadata_transformer=None, server_host_override=None): self._lock = threading.Lock() self._breakdown = breakdown self._host = host @@ -123,6 +123,7 @@ class _Stub(interfaces.Stub): self._root_certificates = root_certificates self._private_key = private_key self._certificate_chain = certificate_chain + self._metadata_transformer = metadata_transformer self._server_host_override = server_host_override self._pool = None @@ -141,6 +142,7 @@ class _Stub(interfaces.Stub): self._breakdown.request_serializers, self._breakdown.response_deserializers, self._secure, self._root_certificates, self._private_key, self._certificate_chain, + metadata_transformer=self._metadata_transformer, server_host_override=self._server_host_override) self._front.join_rear_link(self._rear_link) self._rear_link.join_fore_link(self._front) @@ -189,8 +191,9 @@ class _Stub(interfaces.Stub): def stub( - service_name, methods, host, port, secure=False, root_certificates=None, - private_key=None, certificate_chain=None, server_host_override=None): + service_name, methods, host, port, metadata_transformer=None, secure=False, + root_certificates=None, private_key=None, certificate_chain=None, + server_host_override=None): """Constructs an interfaces.Stub. Args: @@ -201,6 +204,9 @@ def stub( not qualified by the service name or decorated in any other way. host: The host to which to connect for RPC service. port: The port to which to connect for RPC service. + metadata_transformer: A callable that given a metadata object produces + another metadata object to be used in the underlying communication on the + wire. secure: Whether or not to construct the stub with a secure connection. root_certificates: The PEM-encoded root certificates or None to ask for them to be retrieved from a default location. |