aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-03-06 13:47:58 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-03-06 13:47:58 -0800
commitc20324e8aa3d98237100a4edf2fe19c212f9b8fc (patch)
tree69ac9ffef7a66faf5c599301440c2290231a5e81
parentbea386b1c6e8f93cb8e6deb64f132dd1c99b09de (diff)
parent3e0f48b2ec8d8d64728f3662d9e2180bdbd9c803 (diff)
Merge github.com:grpc/grpc into credit
-rw-r--r--src/core/surface/channel.c2
-rw-r--r--src/core/surface/init.c16
-rw-r--r--src/core/surface/init.h1
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/csharp/.gitignore1
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs2
-rw-r--r--src/csharp/Grpc.Core/ChannelArgs.cs2
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj7
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs577
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs407
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs125
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCompletion.cs95
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs56
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs41
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs15
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs45
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs48
-rw-r--r--src/csharp/Grpc.Core/OperationFailedException.cs48
-rw-r--r--src/csharp/Grpc.Core/ServerCallHandler.cs34
-rw-r--r--src/csharp/Grpc.Core/Status.cs62
-rw-r--r--src/csharp/Grpc.Core/Utils/Preconditions.cs113
-rw-r--r--src/csharp/Grpc.Examples.MathClient/MathClient.cs27
-rw-r--r--src/csharp/Grpc.Examples/MathExamples.cs128
-rwxr-xr-xsrc/ruby/grpc.gemspec4
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb61
-rw-r--r--test/compiler/python_plugin_test.py50
-rwxr-xr-xtools/run_tests/python_tests.json64
-rwxr-xr-xtools/run_tests/run_python.sh2
-rwxr-xr-xtools/run_tests/run_tests.py11
29 files changed, 1269 insertions, 779 deletions
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index e38734c6a4..e764a3b9af 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -39,6 +39,7 @@
#include "src/core/iomgr/iomgr.h"
#include "src/core/surface/call.h"
#include "src/core/surface/client.h"
+#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -63,6 +64,7 @@ grpc_channel *grpc_channel_create_from_filters(
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
+ GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
/* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */
gpr_ref_init(&channel->refs, 1 + is_client);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 4db66fb66e..e48c4202e5 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -40,17 +40,17 @@
#include "src/core/surface/surface_trace.h"
#include "src/core/transport/chttp2_transport.h"
-static gpr_once g_init = GPR_ONCE_INIT;
+static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
-static void do_init(void) {
+static void do_basic_init(void) {
gpr_mu_init(&g_init_mu);
g_initializations = 0;
}
void grpc_init(void) {
- gpr_once_init(&g_init, do_init);
+ gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
@@ -73,3 +73,13 @@ void grpc_shutdown(void) {
}
gpr_mu_unlock(&g_init_mu);
}
+
+int grpc_is_initialized(void) {
+ int r;
+ gpr_once_init(&g_basic_init, do_basic_init);
+ gpr_mu_lock(&g_init_mu);
+ r = g_initializations > 0;
+ gpr_mu_unlock(&g_init_mu);
+ return r;
+}
+
diff --git a/src/core/surface/init.h b/src/core/surface/init.h
index ab40bedf87..416874020d 100644
--- a/src/core/surface/init.h
+++ b/src/core/surface/init.h
@@ -35,5 +35,6 @@
#define GRPC_INTERNAL_CORE_SURFACE_INIT_H
void grpc_security_pre_init(void);
+int grpc_is_initialized(void);
#endif /* GRPC_INTERNAL_CORE_SURFACE_INIT_H */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index c99a1b4cc9..424734c54c 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -44,6 +44,7 @@
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
+#include "src/core/surface/init.h"
#include "src/core/transport/metadata.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -612,6 +613,9 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
int census_enabled = grpc_channel_args_is_census_enabled(args);
grpc_server *server = gpr_malloc(sizeof(grpc_server));
+
+ GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
+
memset(server, 0, sizeof(grpc_server));
if (cq) addcq(server, cq);
diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore
index 4f4cd1f7d1..dbaf60de0c 100644
--- a/src/csharp/.gitignore
+++ b/src/csharp/.gitignore
@@ -1,4 +1,5 @@
*.userprefs
+StyleCop.Cache
test-results
packages
Grpc.v12.suo
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 282d521ba3..9db08d2f02 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -127,8 +127,6 @@ namespace Grpc.Core.Tests
[Test]
public void NopPInvokeBenchmark()
{
- CompletionCallbackDelegate handler = Handler;
-
BenchmarkUtil.RunBenchmark(
1000000, 100000000,
() => {
diff --git a/src/csharp/Grpc.Core/ChannelArgs.cs b/src/csharp/Grpc.Core/ChannelArgs.cs
index 653a5780a3..298b6edf20 100644
--- a/src/csharp/Grpc.Core/ChannelArgs.cs
+++ b/src/csharp/Grpc.Core/ChannelArgs.cs
@@ -99,7 +99,7 @@ namespace Grpc.Core
}
return nativeArgs;
}
- catch (Exception e)
+ catch (Exception)
{
if (nativeArgs != null)
{
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 93d5430591..78b6cdde59 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -51,7 +51,6 @@
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
- <Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Method.cs" />
<Compile Include="ServerCalls.cs" />
@@ -69,6 +68,12 @@
<Compile Include="Credentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="ChannelArgs.cs" />
+ <Compile Include="Internal\AsyncCompletion.cs" />
+ <Compile Include="Internal\AsyncCallBase.cs" />
+ <Compile Include="Internal\AsyncCallServer.cs" />
+ <Compile Include="OperationFailedException.cs" />
+ <Compile Include="Internal\AsyncCall.cs" />
+ <Compile Include="Utils\Preconditions.cs" />
</ItemGroup>
<Choose>
<!-- Under older versions of Monodevelop, Choose is not supported and is just
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 6f37b059f7..5ae036298b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -43,84 +43,47 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Handles native call lifecycle and provides convenience methods.
+ /// Handles client side native call lifecycle.
/// </summary>
- internal class AsyncCall<TWrite, TRead>
+ internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
- readonly Func<TWrite, byte[]> serializer;
- readonly Func<byte[], TRead> deserializer;
-
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
- readonly CompletionCallbackDelegate writeFinishedHandler;
- readonly CompletionCallbackDelegate readFinishedHandler;
- readonly CompletionCallbackDelegate halfclosedHandler;
- readonly CompletionCallbackDelegate finishedServersideHandler;
-
- object myLock = new object();
- GCHandle gchandle;
- CallSafeHandle call;
- bool disposed;
-
- bool server;
-
- bool started;
- bool errorOccured;
- bool cancelRequested;
- bool readingDone;
- bool halfcloseRequested;
- bool halfclosed;
- bool finished;
-
- // Completion of a pending write if not null.
- TaskCompletionSource<object> writeTcs;
-
- // Completion of a pending read if not null.
- TaskCompletionSource<TRead> readTcs;
-
- // Completion of a pending halfclose if not null.
- TaskCompletionSource<object> halfcloseTcs;
// Completion of a pending unary response if not null.
- TaskCompletionSource<TRead> unaryResponseTcs;
+ TaskCompletionSource<TResponse> unaryResponseTcs;
- // Set after status is received on client. Only used for server streaming and duplex streaming calls.
+ // Set after status is received. Only used for streaming response calls.
Nullable<Status> finishedStatus;
- TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
- // For streaming, the reads will be delivered to this observer.
- IObserver<TRead> readObserver;
+ bool readObserverCompleted; // True if readObserver has already been completed.
- public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
- this.serializer = serializer;
- this.deserializer = deserializer;
- this.unaryResponseHandler = HandleUnaryResponse;
- this.finishedHandler = HandleFinished;
- this.writeFinishedHandler = HandleWriteFinished;
- this.readFinishedHandler = HandleReadFinished;
- this.halfclosedHandler = HandleHalfclosed;
- this.finishedServersideHandler = HandleFinishedServerside;
+ this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
+ this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
- InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
+ var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
+ InitializeInternal(call);
}
- public void InitializeServer(CallSafeHandle call)
- {
- InitializeInternal(call, true);
- }
-
- public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
+ // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
+ // it is reusing fair amount of code in this class, so we are leaving it here.
+ // TODO: for other calls, you need to call Initialize, this methods calls initialize
+ // on its own, so there's a usage inconsistency.
+ /// <summary>
+ /// Blocking unary request - unary response call.
+ /// </summary>
+ public TResponse UnaryCall(Channel channel, String methodName, TRequest msg)
{
using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
lock (myLock)
{
@@ -143,508 +106,200 @@ namespace Grpc.Core.Internal
}
}
- public Task<TRead> UnaryCallAsync(TWrite msg)
+ /// <summary>
+ /// Starts a unary request - unary response call.
+ /// </summary>
+ public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
halfcloseRequested = true;
readingDone = true;
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartUnary(payload, unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
- public Task<TRead> ClientStreamingCallAsync()
+ /// <summary>
+ /// Starts a streamed request - unary response call.
+ /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
+ /// </summary>
+ public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
readingDone = true;
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartClientStreaming(unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
- public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
+ /// <summary>
+ /// Starts a unary request - streamed response call.
+ /// </summary>
+ public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
this.readObserver = readObserver;
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
call.StartServerStreaming(payload, finishedHandler);
- ReceiveMessageAsync();
+ StartReceiveMessage();
}
}
- public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
+ /// <summary>
+ /// Starts a streaming request - streaming response call.
+ /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
+ /// </summary>
+ public void StartDuplexStreamingCall(IObserver<TResponse> readObserver)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler);
- ReceiveMessageAsync();
+ StartReceiveMessage();
}
}
- public Task ServerSideUnaryRequestCallAsync()
- {
- lock (myLock)
- {
- started = true;
- call.StartServerSide(finishedServersideHandler);
- return finishedServersideTcs.Task;
- }
- }
-
- public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
- {
- lock (myLock)
- {
- started = true;
- call.StartServerSide(finishedServersideHandler);
-
- if (this.readObserver != null)
- {
- throw new InvalidOperationException("Already registered an observer.");
- }
- this.readObserver = readObserver;
- ReceiveMessageAsync();
-
- return finishedServersideTcs.Task;
- }
- }
-
- public Task SendMessageAsync(TWrite msg)
+ /// <summary>
+ /// Sends a streaming request. Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate)
{
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
-
- if (writeTcs != null)
- {
- throw new InvalidOperationException("Only one write can be pending at a time");
- }
-
- // TODO: wrap serialization...
- byte[] payload = serializer(msg);
-
- call.StartSendMessage(payload, writeFinishedHandler);
- writeTcs = new TaskCompletionSource<object>();
- return writeTcs.Task;
- }
+ StartSendMessageInternal(msg, completionDelegate);
}
- public Task SendCloseFromClientAsync()
+ /// <summary>
+ /// Sends halfclose, indicating client is done with streaming requests.
+ /// Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckSendingAllowed();
call.StartSendCloseFromClient(halfclosedHandler);
halfcloseRequested = true;
- halfcloseTcs = new TaskCompletionSource<object>();
- return halfcloseTcs.Task;
- }
- }
-
- public Task SendStatusFromServerAsync(Status status)
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
-
- call.StartSendStatusFromServer(status, halfclosedHandler);
- halfcloseRequested = true;
- halfcloseTcs = new TaskCompletionSource<object>();
- return halfcloseTcs.Task;
+ sendCompletionDelegate = completionDelegate;
}
}
- public Task<TRead> ReceiveMessageAsync()
+ /// <summary>
+ /// On client-side, we only fire readObserver.OnCompleted once all messages have been read
+ /// and status has been received.
+ /// </summary>
+ protected override void CompleteReadObserver()
{
- lock (myLock)
+ if (readingDone && finishedStatus.HasValue)
{
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (readingDone)
- {
- throw new InvalidOperationException("Already read the last message.");
- }
-
- if (readTcs != null)
+ bool shouldComplete;
+ lock (myLock)
{
- throw new InvalidOperationException("Only one read can be pending at a time");
+ shouldComplete = !readObserverCompleted;
+ readObserverCompleted = true;
}
- call.StartReceiveMessage(readFinishedHandler);
-
- readTcs = new TaskCompletionSource<TRead>();
- return readTcs.Task;
- }
- }
-
- public void Cancel()
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- cancelRequested = true;
- }
- // grpc_call_cancel is threadsafe
- call.Cancel();
- }
-
- public void CancelWithStatus(Status status)
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- cancelRequested = true;
- }
- // grpc_call_cancel_with_status is threadsafe
- call.CancelWithStatus(status);
- }
-
- private void InitializeInternal(CallSafeHandle call, bool server)
- {
- lock (myLock)
- {
- // Make sure this object and the delegated held by it will not be garbage collected
- // before we release this handle.
- gchandle = GCHandle.Alloc(this);
- this.call = call;
- this.server = server;
- }
- }
-
- private void CheckStarted()
- {
- if (!started)
- {
- throw new InvalidOperationException("Call not started");
- }
- }
-
- private void CheckNotDisposed()
- {
- if (disposed)
- {
- throw new InvalidOperationException("Call has already been disposed.");
- }
- }
-
- private void CheckNoError()
- {
- if (errorOccured)
- {
- throw new InvalidOperationException("Error occured when processing call.");
- }
- }
-
- private bool ReleaseResourcesIfPossible()
- {
- if (!disposed && call != null)
- {
- if (halfclosed && readingDone && finished)
+ if (shouldComplete)
{
- ReleaseResources();
- return true;
+ var status = finishedStatus.Value;
+ if (status.StatusCode != StatusCode.OK)
+ {
+ FireReadObserverOnError(new RpcException(status));
+ }
+ else
+ {
+ FireReadObserverOnCompleted();
+ }
}
}
- return false;
- }
-
- private void ReleaseResources()
- {
- if (call != null) {
- call.Dispose();
- }
- gchandle.Free();
- disposed = true;
- }
-
- private void CompleteStreamObserver(Status status)
- {
- if (status.StatusCode != StatusCode.OK)
- {
- // TODO: wrap to handle exceptions;
- readObserver.OnError(new RpcException(status));
- } else {
- // TODO: wrap to handle exceptions;
- readObserver.OnCompleted();
- }
}
/// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
+ private void HandleUnaryResponse(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- try
+ lock(myLock)
{
- TaskCompletionSource<TRead> tcs;
- lock(myLock)
- {
- finished = true;
- halfclosed = true;
- tcs = unaryResponseTcs;
-
- ReleaseResourcesIfPossible();
- }
-
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ finished = true;
+ halfclosed = true;
- if (error != GRPCOpError.GRPC_OP_OK)
- {
- tcs.SetException(new RpcException(
- new Status(StatusCode.Internal, "Internal error occured.")
- ));
- return;
- }
-
- var status = ctx.GetReceivedStatus();
- if (status.StatusCode != StatusCode.OK)
- {
- tcs.SetException(new RpcException(status));
- return;
- }
-
- // TODO: handle deserialize error...
- var msg = deserializer(ctx.GetReceivedMessage());
- tcs.SetResult(msg);
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
+ ReleaseResourcesIfPossible();
}
- }
-
- private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- TaskCompletionSource<object> oldTcs = null;
- lock (myLock)
- {
- oldTcs = writeTcs;
- writeTcs = null;
- }
-
- if (errorOccured)
- {
- // TODO: use the right type of exception...
- oldTcs.SetException(new Exception("Write failed"));
- }
- else
- {
- // TODO: where does the continuation run?
- oldTcs.SetResult(null);
- }
- }
- catch(Exception e)
+ if (wasError)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ unaryResponseTcs.SetException(new RpcException(
+ new Status(StatusCode.Internal, "Internal error occured.")
+ ));
+ return;
}
- }
-
- private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- lock (myLock)
- {
- halfclosed = true;
- ReleaseResourcesIfPossible();
- }
-
- if (error != GRPCOpError.GRPC_OP_OK)
- {
- halfcloseTcs.SetException(new Exception("Halfclose failed"));
-
- }
- else
- {
- halfcloseTcs.SetResult(null);
- }
- }
- catch(Exception e)
+ var status = ctx.GetReceivedStatus();
+ if (status.StatusCode != StatusCode.OK)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ unaryResponseTcs.SetException(new RpcException(status));
+ return;
}
- }
-
- private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- var payload = ctx.GetReceivedMessage();
-
- TaskCompletionSource<TRead> oldTcs = null;
- IObserver<TRead> observer = null;
-
- Nullable<Status> status = null;
-
- lock (myLock)
- {
- oldTcs = readTcs;
- readTcs = null;
- if (payload == null)
- {
- readingDone = true;
- }
- observer = readObserver;
- status = finishedStatus;
-
- ReleaseResourcesIfPossible();
- }
-
- // TODO: wrap deserialization...
- TRead msg = payload != null ? deserializer(payload) : default(TRead);
- oldTcs.SetResult(msg);
+ // TODO: handle deserialization error
+ TResponse msg;
+ TryDeserialize(ctx.GetReceivedMessage(), out msg);
- // TODO: make sure we deliver reads in the right order.
-
- if (observer != null)
- {
- if (payload != null)
- {
- // TODO: wrap to handle exceptions
- observer.OnNext(msg);
-
- // start a new read
- ReceiveMessageAsync();
- }
- else
- {
- if (!server)
- {
- if (status.HasValue)
- {
- CompleteStreamObserver(status.Value);
- }
- }
- else
- {
- // TODO: wrap to handle exceptions..
- observer.OnCompleted();
- }
- // TODO: completeStreamObserver serverside...
- }
- }
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+ unaryResponseTcs.SetResult(msg);
}
- private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
+ /// <summary>
+ /// Handles receive status completion for calls with streaming response.
+ /// </summary>
+ private void HandleFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- var status = ctx.GetReceivedStatus();
-
- bool wasReadingDone;
-
- lock (myLock)
- {
- finished = true;
- finishedStatus = status;
-
- wasReadingDone = readingDone;
-
- ReleaseResourcesIfPossible();
- }
-
- if (wasReadingDone) {
- CompleteStreamObserver(status);
- }
-
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
- }
+ var status = ctx.GetReceivedStatus();
- private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
+ lock (myLock)
{
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
-
- lock(myLock)
- {
- finished = true;
-
- // TODO: because of the way server calls are implemented, we need to set
- // reading done to true here. Should be fixed in the future.
- readingDone = true;
-
- ReleaseResourcesIfPossible();
- }
- // TODO: handle error ...
-
- finishedServersideTcs.SetResult(null);
+ finished = true;
+ finishedStatus = status;
+ ReleaseResourcesIfPossible();
}
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+
+ CompleteReadObserver();
}
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
new file mode 100644
index 0000000000..44d66b394c
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -0,0 +1,407 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Base for handling both client side and server side calls.
+ /// Handles native call lifecycle and provides convenience methods.
+ /// </summary>
+ internal abstract class AsyncCallBase<TWrite, TRead>
+ {
+ readonly Func<TWrite, byte[]> serializer;
+ readonly Func<byte[], TRead> deserializer;
+
+ protected readonly CompletionCallbackDelegate sendFinishedHandler;
+ protected readonly CompletionCallbackDelegate readFinishedHandler;
+ protected readonly CompletionCallbackDelegate halfclosedHandler;
+
+ protected readonly object myLock = new object();
+
+ protected GCHandle gchandle;
+ protected CallSafeHandle call;
+ protected bool disposed;
+
+ protected bool started;
+ protected bool errorOccured;
+ protected bool cancelRequested;
+
+ protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
+ protected bool readPending; // True if there is a read in progress.
+ protected bool readingDone;
+ protected bool halfcloseRequested;
+ protected bool halfclosed;
+ protected bool finished; // True if close has been received from the peer.
+
+ // Streaming reads will be delivered to this observer. For a call that only does unary read it may remain null.
+ protected IObserver<TRead> readObserver;
+
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ {
+ this.serializer = Preconditions.CheckNotNull(serializer);
+ this.deserializer = Preconditions.CheckNotNull(deserializer);
+
+ this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
+ this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
+ this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
+ }
+
+ /// <summary>
+ /// Requests cancelling the call.
+ /// </summary>
+ public void Cancel()
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(started);
+ cancelRequested = true;
+
+ if (!disposed)
+ {
+ call.Cancel();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Requests cancelling the call with given status.
+ /// </summary>
+ public void CancelWithStatus(Status status)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(started);
+ cancelRequested = true;
+
+ if (!disposed)
+ {
+ call.CancelWithStatus(status);
+ }
+ }
+ }
+
+ protected void InitializeInternal(CallSafeHandle call)
+ {
+ lock (myLock)
+ {
+ // Make sure this object and the delegated held by it will not be garbage collected
+ // before we release this handle.
+ gchandle = GCHandle.Alloc(this);
+ this.call = call;
+ }
+ }
+
+ /// <summary>
+ /// Initiates sending a message. Only once send operation can be active at a time.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate)
+ {
+ byte[] payload = UnsafeSerialize(msg);
+
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckSendingAllowed();
+
+ call.StartSendMessage(payload, sendFinishedHandler);
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
+ /// Requests receiving a next message.
+ /// </summary>
+ protected void StartReceiveMessage()
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(started);
+ Preconditions.CheckState(!disposed);
+ Preconditions.CheckState(!errorOccured);
+
+ Preconditions.CheckState(!readingDone);
+ Preconditions.CheckState(!readPending);
+
+ call.StartReceiveMessage(readFinishedHandler);
+ readPending = true;
+ }
+ }
+
+ /// <summary>
+ /// Default behavior just completes the read observer, but more sofisticated behavior might be required
+ /// by subclasses.
+ /// </summary>
+ protected virtual void CompleteReadObserver()
+ {
+ FireReadObserverOnCompleted();
+ }
+
+ /// <summary>
+ /// If there are no more pending actions and no new actions can be started, releases
+ /// the underlying native resources.
+ /// </summary>
+ protected bool ReleaseResourcesIfPossible()
+ {
+ if (!disposed && call != null)
+ {
+ if (halfclosed && readingDone && finished)
+ {
+ ReleaseResources();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void ReleaseResources()
+ {
+ if (call != null)
+ {
+ call.Dispose();
+ }
+ gchandle.Free();
+ disposed = true;
+ }
+
+ protected void CheckSendingAllowed()
+ {
+ Preconditions.CheckState(started);
+ Preconditions.CheckState(!disposed);
+ Preconditions.CheckState(!errorOccured);
+
+ Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
+ Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
+ }
+
+ protected byte[] UnsafeSerialize(TWrite msg)
+ {
+ return serializer(msg);
+ }
+
+ protected bool TrySerialize(TWrite msg, out byte[] payload)
+ {
+ try
+ {
+ payload = serializer(msg);
+ return true;
+ }
+ catch(Exception)
+ {
+ Console.WriteLine("Exception occured while trying to serialize message");
+ payload = null;
+ return false;
+ }
+ }
+
+ protected bool TryDeserialize(byte[] payload, out TRead msg)
+ {
+ try
+ {
+ msg = deserializer(payload);
+ return true;
+ }
+ catch(Exception)
+ {
+ Console.WriteLine("Exception occured while trying to deserialize message");
+ msg = default(TRead);
+ return false;
+ }
+ }
+
+ protected void FireReadObserverOnNext(TRead value)
+ {
+ try
+ {
+ readObserver.OnNext(value);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking readObserver.OnNext: " + e);
+ }
+ }
+
+ protected void FireReadObserverOnCompleted()
+ {
+ try
+ {
+ readObserver.OnCompleted();
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking readObserver.OnCompleted: " + e);
+ }
+ }
+
+ protected void FireReadObserverOnError(Exception error)
+ {
+ try
+ {
+ readObserver.OnError(error);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking readObserver.OnError: " + e);
+ }
+ }
+
+ protected void FireCompletion(AsyncCompletionDelegate completionDelegate, Exception error)
+ {
+ try
+ {
+ completionDelegate(error);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
+
+ /// <summary>
+ /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
+ /// prevent propagating exceptions accross managed/unmanaged boundary.
+ /// </summary>
+ protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
+ {
+ return new CompletionCallbackDelegate( (error, batchContextPtr) => {
+ try
+ {
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ bool wasError = (error != GRPCOpError.GRPC_OP_OK);
+ handler(wasError, ctx);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ });
+ }
+
+ /// <summary>
+ /// Handles send completion.
+ /// </summary>
+ private void HandleSendFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ AsyncCompletionDelegate origCompletionDelegate = null;
+ lock (myLock)
+ {
+ origCompletionDelegate = sendCompletionDelegate;
+ sendCompletionDelegate = null;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ if (wasError)
+ {
+ FireCompletion(origCompletionDelegate, new OperationFailedException("Send failed"));
+ }
+ else
+ {
+ FireCompletion(origCompletionDelegate, null);
+ }
+ }
+
+ /// <summary>
+ /// Handles halfclose completion.
+ /// </summary>
+ private void HandleHalfclosed(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ AsyncCompletionDelegate origCompletionDelegate = null;
+ lock (myLock)
+ {
+ halfclosed = true;
+ origCompletionDelegate = sendCompletionDelegate;
+ sendCompletionDelegate = null;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ if (wasError)
+ {
+ FireCompletion(origCompletionDelegate, new OperationFailedException("Halfclose failed"));
+ }
+ else
+ {
+ FireCompletion(origCompletionDelegate, null);
+ }
+
+ }
+
+ /// <summary>
+ /// Handles streaming read completion.
+ /// </summary>
+ private void HandleReadFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ var payload = ctx.GetReceivedMessage();
+
+ lock (myLock)
+ {
+ readPending = false;
+ if (payload == null)
+ {
+ readingDone = true;
+ }
+
+ ReleaseResourcesIfPossible();
+ }
+
+ // TODO: handle the case when error occured...
+
+ if (payload != null)
+ {
+ // TODO: handle deserialization error
+ TRead msg;
+ TryDeserialize(payload, out msg);
+
+ FireReadObserverOnNext(msg);
+
+ // Start a new read. The current one has already been delivered,
+ // so correct ordering of reads is assured.
+ StartReceiveMessage();
+ }
+ else
+ {
+ CompleteReadObserver();
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
new file mode 100644
index 0000000000..d3a2be553f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -0,0 +1,125 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Handles server side native call lifecycle.
+ /// </summary>
+ internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
+ {
+ readonly CompletionCallbackDelegate finishedServersideHandler;
+ readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
+ {
+ this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
+ }
+
+ public void Initialize(CallSafeHandle call)
+ {
+ InitializeInternal(call);
+ }
+
+ /// <summary>
+ /// Starts a server side call. Currently, all server side calls are implemented as duplex
+ /// streaming call and they are adapted to the appropriate streaming arity.
+ /// </summary>
+ public Task ServerSideCallAsync(IObserver<TRequest> readObserver)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(call);
+
+ started = true;
+ this.readObserver = readObserver;
+
+ call.StartServerSide(finishedServersideHandler);
+ StartReceiveMessage();
+ return finishedServersideTcs.Task;
+ }
+ }
+
+ /// <summary>
+ /// Sends a streaming response. Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate)
+ {
+ StartSendMessageInternal(msg, completionDelegate);
+ }
+
+ /// <summary>
+ /// Sends call result status, also indicating server is done with streaming responses.
+ /// Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckSendingAllowed();
+
+ call.StartSendStatusFromServer(status, halfclosedHandler);
+ halfcloseRequested = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
+ /// Handles the server side close completion.
+ /// </summary>
+ private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ lock (myLock)
+ {
+ finished = true;
+
+ ReleaseResourcesIfPossible();
+ }
+ // TODO: handle error ...
+
+ finishedServersideTcs.SetResult(null);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
new file mode 100644
index 0000000000..b78bb497fa
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
@@ -0,0 +1,95 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// If error != null, there's been an error or operation has been cancelled.
+ /// </summary>
+ internal delegate void AsyncCompletionDelegate(Exception error);
+
+ /// <summary>
+ /// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
+ /// </summary>
+ internal class AsyncCompletionTaskSource
+ {
+ readonly TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
+ readonly AsyncCompletionDelegate completionDelegate;
+
+ public AsyncCompletionTaskSource()
+ {
+ completionDelegate = new AsyncCompletionDelegate(HandleCompletion);
+ }
+
+ public Task Task
+ {
+ get
+ {
+ return tcs.Task;
+ }
+ }
+
+ public AsyncCompletionDelegate CompletionDelegate
+ {
+ get
+ {
+ return completionDelegate;
+ }
+ }
+
+ private void HandleCompletion(Exception error)
+ {
+ if (error == null)
+ {
+ tcs.SetResult(null);
+ return;
+ }
+ if (error is OperationCanceledException)
+ {
+ tcs.SetCanceled();
+ return;
+ }
+ tcs.SetException(error);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 1c0bc98f06..61566b5407 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -30,7 +29,6 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
-
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
@@ -38,14 +36,12 @@ using Grpc.Core;
namespace Grpc.Core.Internal
{
- //TODO: rename the delegate
- internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
-
+ internal delegate void CompletionCallbackDelegate(GRPCOpError error,IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
- internal class CallSafeHandle : SafeHandleZeroIsInvalid
- {
+ internal class CallSafeHandle : SafeHandleZeroIsInvalid
+ {
const UInt32 GRPC_WRITE_BUFFER_HINT = 1;
[DllImport("grpc_csharp_ext.dll")]
@@ -59,22 +55,22 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@@ -82,28 +78,27 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
-
private CallSafeHandle()
{
}
@@ -115,12 +110,12 @@ namespace Grpc.Core.Internal
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
+ AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback)
{
- grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length));
+ grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length));
}
public void StartClientStreaming(CompletionCallbackDelegate callback)
@@ -130,7 +125,7 @@ namespace Grpc.Core.Internal
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length)));
+ AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public void StartDuplexStreaming(CompletionCallbackDelegate callback)
@@ -140,7 +135,7 @@ namespace Grpc.Core.Internal
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length)));
+ AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
@@ -173,19 +168,20 @@ namespace Grpc.Core.Internal
AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
}
- protected override bool ReleaseHandle()
- {
+ protected override bool ReleaseHandle()
+ {
grpcsharp_call_destroy(handle);
- return true;
- }
+ return true;
+ }
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
- private static UInt32 GetFlags(bool buffered) {
+ private static UInt32 GetFlags(bool buffered)
+ {
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
- }
+ }
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs
index fb59e86e2d..286c54f2c4 100644
--- a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,40 +27,40 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
-
using System;
using Grpc.Core.Internal;
namespace Grpc.Core.Internal
{
internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
- {
+ {
readonly AsyncCall<TWrite, TRead> call;
public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call)
- {
+ {
this.call = call;
- }
-
- public void OnCompleted()
- {
+ }
+ public void OnCompleted()
+ {
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendCloseFromClient(taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendCloseFromClientAsync().Wait();
- }
+ taskSource.Task.Wait();
+ }
- public void OnError(Exception error)
- {
- throw new InvalidOperationException("This should never be called.");
- }
+ public void OnError(Exception error)
+ {
+ throw new InvalidOperationException("This should never be called.");
+ }
- public void OnNext(TWrite value)
- {
+ public void OnNext(TWrite value)
+ {
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendMessage(value, taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendMessageAsync(value).Wait();
- }
- }
+ taskSource.Task.Wait();
+ }
+ }
}
-
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 3f01fdbfd0..6bff923c55 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,9 +27,7 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
-
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
@@ -40,8 +37,8 @@ namespace Grpc.Core.Internal
/// <summary>
/// grpc_completion_queue from <grpc/grpc.h>
/// </summary>
- internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid
- {
+ internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid
+ {
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();
@@ -73,11 +70,11 @@ namespace Grpc.Core.Internal
grpcsharp_completion_queue_shutdown(this);
}
- protected override bool ReleaseHandle()
+ protected override bool ReleaseHandle()
{
grpcsharp_completion_queue_destroy(handle);
- return true;
- }
- }
+ return true;
+ }
+ }
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
index 08d9921475..9873dc9c71 100644
--- a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,9 +27,7 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
-
using System;
using Grpc.Core.Internal;
@@ -40,32 +37,36 @@ namespace Grpc.Core.Internal
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
- internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite>
- {
- readonly AsyncCall<TWrite, TRead> call;
+ internal class ServerStreamingOutputObserver<TRequest, TResponse> : IObserver<TResponse>
+ {
+ readonly AsyncCallServer<TRequest, TResponse> call;
- public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call)
- {
+ public ServerStreamingOutputObserver(AsyncCallServer<TRequest, TResponse> call)
+ {
this.call = call;
- }
+ }
- public void OnCompleted()
- {
+ public void OnCompleted()
+ {
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendStatusFromServer(new Status(StatusCode.OK, ""), taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait();
- }
+ taskSource.Task.Wait();
+ }
- public void OnError(Exception error)
- {
+ public void OnError(Exception error)
+ {
// TODO: implement this...
- throw new InvalidOperationException("This should never be called.");
- }
+ throw new InvalidOperationException("This should never be called.");
+ }
- public void OnNext(TWrite value)
- {
+ public void OnNext(TResponse value)
+ {
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendMessage(value, taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendMessageAsync(value).Wait();
- }
- }
+ taskSource.Task.Wait();
+ }
+ }
}
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index b191ecde94..e6efd66f13 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,21 +27,19 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
-
using System;
using System.Runtime.InteropServices;
using System.Threading;
namespace Grpc.Core.Internal
{
- /// <summary>
- /// gpr_timespec from grpc/support/time.h
- /// </summary>
- [StructLayout(LayoutKind.Sequential)]
- internal struct Timespec
- {
+ /// <summary>
+ /// gpr_timespec from grpc/support/time.h
+ /// </summary>
+ [StructLayout(LayoutKind.Sequential)]
+ internal struct Timespec
+ {
const int nanosPerSecond = 1000 * 1000 * 1000;
const int nanosPerTick = 100;
@@ -54,23 +51,22 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
-
// TODO: revisit this.
- // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
+ // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
- public System.IntPtr tv_sec;
- public System.IntPtr tv_nsec;
+ public System.IntPtr tv_sec;
+ public System.IntPtr tv_nsec;
- /// <summary>
- /// Timespec a long time in the future.
- /// </summary>
- public static Timespec InfFuture
- {
- get
- {
+ /// <summary>
+ /// Timespec a long time in the future.
+ /// </summary>
+ public static Timespec InfFuture
+ {
+ get
+ {
return gprsharp_inf_future();
- }
- }
+ }
+ }
public static Timespec Now
{
@@ -92,7 +88,8 @@ namespace Grpc.Core.Internal
/// Creates a GPR deadline from current instant and given timeout.
/// </summary>
/// <returns>The from timeout.</returns>
- public static Timespec DeadlineFromTimeout(TimeSpan timeout) {
+ public static Timespec DeadlineFromTimeout(TimeSpan timeout)
+ {
if (timeout == Timeout.InfiniteTimeSpan)
{
return Timespec.InfFuture;
@@ -100,7 +97,8 @@ namespace Grpc.Core.Internal
return Timespec.Now.Add(timeout);
}
- public Timespec Add(TimeSpan timeSpan) {
+ public Timespec Add(TimeSpan timeSpan)
+ {
long nanos = tv_nsec.ToInt64() + (timeSpan.Ticks % TimeSpan.TicksPerSecond) * nanosPerTick;
long overflow_sec = (nanos > nanosPerSecond) ? 1 : 0;
@@ -109,6 +107,6 @@ namespace Grpc.Core.Internal
result.tv_sec = new IntPtr(tv_sec.ToInt64() + (timeSpan.Ticks / TimeSpan.TicksPerSecond) + overflow_sec);
return result;
}
- }
+ }
}
diff --git a/src/csharp/Grpc.Core/OperationFailedException.cs b/src/csharp/Grpc.Core/OperationFailedException.cs
new file mode 100644
index 0000000000..34a8c95a85
--- /dev/null
+++ b/src/csharp/Grpc.Core/OperationFailedException.cs
@@ -0,0 +1,48 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Thrown when gRPC operation fails.
+ /// </summary>
+ public class OperationFailedException : Exception
+ {
+ public OperationFailedException(string message) : base(message)
+ {
+ }
+ }
+}
+
diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs
index 289f97aece..3eb8422f57 100644
--- a/src/csharp/Grpc.Core/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/ServerCallHandler.cs
@@ -32,7 +32,9 @@
#endregion
using System;
+using System.Linq;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
@@ -54,17 +56,17 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
- var asyncCall = new AsyncCall<TResponse, TRequest>(
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
- asyncCall.InitializeServer(call);
+ asyncCall.Initialize(call);
- var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
+ var requestObserver = new RecordingObserver<TRequest>();
+ var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
- var request = asyncCall.ReceiveMessageAsync().Result;
-
- var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
+ var request = requestObserver.ToList().Result.Single();
+ var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall);
handler(request, responseObserver);
finishedTask.Wait();
@@ -85,15 +87,15 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
- var asyncCall = new AsyncCall<TResponse, TRequest>(
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
- asyncCall.InitializeServer(call);
+ asyncCall.Initialize(call);
- var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
+ var responseObserver = new ServerStreamingOutputObserver<TRequest,TResponse>(asyncCall);
var requestObserver = handler(responseObserver);
- var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
+ var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
finishedTask.Wait();
}
}
@@ -103,17 +105,15 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
- AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
+ var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload);
+ asyncCall.Initialize(call);
- asyncCall.InitializeServer(call);
-
- var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
+ var finishedTask = asyncCall.ServerSideCallAsync(new NullObserver<byte[]>());
- // TODO: this makes the call finish before all reads can be done which causes trouble
- // in AsyncCall.HandleReadFinished callback. Revisit this.
- asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait();
+ // TODO: check result of the completion status.
+ asyncCall.StartSendStatusFromServer(new Status(StatusCode.Unimplemented, "No such method."), new AsyncCompletionDelegate((error) => {}));
finishedTask.Wait();
}
diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs
index 5ea1df7b48..080bbdc2f5 100644
--- a/src/csharp/Grpc.Core/Status.cs
+++ b/src/csharp/Grpc.Core/Status.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,7 +27,6 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
using System;
@@ -36,34 +34,40 @@ using System.Runtime.InteropServices;
namespace Grpc.Core
{
- /// <summary>
- /// Represents RPC result.
- /// </summary>
- public struct Status
- {
- readonly StatusCode statusCode;
- readonly string detail;
+ /// <summary>
+ /// Represents RPC result.
+ /// </summary>
+ public struct Status
+ {
+ readonly StatusCode statusCode;
+ readonly string detail;
- public Status(StatusCode statusCode, string detail)
- {
- this.statusCode = statusCode;
- this.detail = detail;
- }
+ public Status(StatusCode statusCode, string detail)
+ {
+ this.statusCode = statusCode;
+ this.detail = detail;
+ }
- public StatusCode StatusCode
- {
- get
- {
- return statusCode;
- }
- }
+ /// <summary>
+ /// Gets the gRPC status code. OK indicates success, all other values indicate an error.
+ /// </summary>
+ public StatusCode StatusCode
+ {
+ get
+ {
+ return statusCode;
+ }
+ }
- public string Detail
- {
- get
- {
- return detail;
- }
- }
- }
+ /// <summary>
+ /// Gets the detail.
+ /// </summary>
+ public string Detail
+ {
+ get
+ {
+ return detail;
+ }
+ }
+ }
}
diff --git a/src/csharp/Grpc.Core/Utils/Preconditions.cs b/src/csharp/Grpc.Core/Utils/Preconditions.cs
new file mode 100644
index 0000000000..b17ce42117
--- /dev/null
+++ b/src/csharp/Grpc.Core/Utils/Preconditions.cs
@@ -0,0 +1,113 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+
+namespace Grpc.Core.Utils
+{
+ public static class Preconditions
+ {
+ /// <summary>
+ /// Throws ArgumentException if condition is false.
+ /// </summary>
+ public static void CheckArgument(bool condition)
+ {
+ if (!condition)
+ {
+ throw new ArgumentException();
+ }
+ }
+
+ /// <summary>
+ /// Throws ArgumentException with given message if condition is false.
+ /// </summary>
+ public static void CheckArgument(bool condition, string errorMessage)
+ {
+ if (!condition)
+ {
+ throw new ArgumentException(errorMessage);
+ }
+ }
+
+ /// <summary>
+ /// Throws NullReferenceException if reference is null.
+ /// </summary>
+ public static T CheckNotNull<T> (T reference)
+ {
+ if (reference == null)
+ {
+ throw new NullReferenceException();
+ }
+ return reference;
+ }
+
+ /// <summary>
+ /// Throws NullReferenceException with given message if reference is null.
+ /// </summary>
+ public static T CheckNotNull<T> (T reference, string errorMessage)
+ {
+ if (reference == null)
+ {
+ throw new NullReferenceException(errorMessage);
+ }
+ return reference;
+ }
+
+ /// <summary>
+ /// Throws InvalidOperationException if condition is false.
+ /// </summary>
+ public static void CheckState(bool condition)
+ {
+ if (!condition)
+ {
+ throw new InvalidOperationException();
+ }
+ }
+
+ /// <summary>
+ /// Throws InvalidOperationException with given message if condition is false.
+ /// </summary>
+ public static void CheckState(bool condition, string errorMessage)
+ {
+ if (!condition)
+ {
+ throw new InvalidOperationException(errorMessage);
+ }
+ }
+ }
+}
+
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index 95a4678bb8..f5956bd33e 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,9 +27,7 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
-
using System;
using System.Runtime.InteropServices;
using System.Threading;
@@ -38,25 +35,25 @@ using Grpc.Core;
namespace math
{
- class MathClient
+ class MathClient
{
- public static void Main (string[] args)
- {
+ public static void Main(string[] args)
+ {
GrpcEnvironment.Initialize();
- using (Channel channel = new Channel("127.0.0.1:23456"))
- {
- MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel);
- MathExamples.DivExample(stub);
+ using (Channel channel = new Channel("127.0.0.1:23456"))
+ {
+ MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel);
+ MathExamples.DivExample(stub);
MathExamples.FibExample(stub);
- MathExamples.SumExample(stub);
+ MathExamples.SumExample(stub);
- MathExamples.DivManyExample(stub);
- }
+ MathExamples.DivManyExample(stub);
+ }
GrpcEnvironment.Shutdown();
- }
- }
+ }
+ }
}
diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs
index 97c91b1b1b..134270f6f7 100644
--- a/src/csharp/Grpc.Examples/MathExamples.cs
+++ b/src/csharp/Grpc.Examples/MathExamples.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,7 +27,6 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
using System;
@@ -39,59 +37,63 @@ using Grpc.Core.Utils;
namespace math
{
- public static class MathExamples
- {
- public static void DivExample(MathGrpc.IMathServiceClient stub)
- {
- DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
- Console.WriteLine("Div Result: " + result);
- }
-
- public static void DivAsyncExample(MathGrpc.IMathServiceClient stub)
- {
- Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
- DivReply result = call.Result;
- Console.WriteLine(result);
- }
-
- public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub)
- {
- Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
- DivReply result = call.Result;
- Console.WriteLine(result);
- }
-
- public static void FibExample(MathGrpc.IMathServiceClient stub)
- {
+ public static class MathExamples
+ {
+ public static void DivExample(MathGrpc.IMathServiceClient stub)
+ {
+ DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
+ Console.WriteLine("Div Result: " + result);
+ }
+
+ public static void DivAsyncExample(MathGrpc.IMathServiceClient stub)
+ {
+ Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
+ DivReply result = call.Result;
+ Console.WriteLine(result);
+ }
+
+ public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub)
+ {
+ Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
+ DivReply result = call.Result;
+ Console.WriteLine(result);
+ }
+
+ public static void FibExample(MathGrpc.IMathServiceClient stub)
+ {
var recorder = new RecordingObserver<Num>();
stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder);
- List<Num> numbers = recorder.ToList().Result;
+ List<Num> numbers = recorder.ToList().Result;
Console.WriteLine("Fib Result: " + string.Join("|", recorder.ToList().Result));
- }
+ }
- public static void SumExample(MathGrpc.IMathServiceClient stub)
- {
- List<Num> numbers = new List<Num>{new Num.Builder { Num_ = 1 }.Build(),
- new Num.Builder { Num_ = 2 }.Build(),
- new Num.Builder { Num_ = 3 }.Build()};
+ public static void SumExample(MathGrpc.IMathServiceClient stub)
+ {
+ List<Num> numbers = new List<Num>
+ {new Num.Builder { Num_ = 1 }.Build(),
+ new Num.Builder { Num_ = 2 }.Build(),
+ new Num.Builder { Num_ = 3 }.Build()
+ };
var res = stub.Sum();
- foreach (var num in numbers) {
+ foreach (var num in numbers)
+ {
res.Inputs.OnNext(num);
}
res.Inputs.OnCompleted();
- Console.WriteLine("Sum Result: " + res.Task.Result);
- }
+ Console.WriteLine("Sum Result: " + res.Task.Result);
+ }
- public static void DivManyExample(MathGrpc.IMathServiceClient stub)
- {
- List<DivArgs> divArgsList = new List<DivArgs>{
- new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(),
- new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
- new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
- };
+ public static void DivManyExample(MathGrpc.IMathServiceClient stub)
+ {
+ List<DivArgs> divArgsList = new List<DivArgs>
+ {
+ new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(),
+ new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
+ new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
+ };
var recorder = new RecordingObserver<DivReply>();
@@ -102,30 +104,30 @@ namespace math
}
inputs.OnCompleted();
- Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result));
- }
+ Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result));
+ }
- public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub)
- {
- var numberList = new List<Num>
- { new Num.Builder{ Num_ = 1 }.Build(),
- new Num.Builder{ Num_ = 2 }.Build(), new Num.Builder{ Num_ = 3 }.Build()
- };
+ public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub)
+ {
+ var numberList = new List<Num>
+ { new Num.Builder{ Num_ = 1 }.Build(),
+ new Num.Builder{ Num_ = 2 }.Build(), new Num.Builder{ Num_ = 3 }.Build()
+ };
- numberList.ToObservable();
+ numberList.ToObservable();
- //IObserver<Num> numbers;
- //Task<Num> call = stub.Sum(out numbers);
- //foreach (var num in numberList)
- //{
- // numbers.OnNext(num);
- //}
- //numbers.OnCompleted();
+ //IObserver<Num> numbers;
+ //Task<Num> call = stub.Sum(out numbers);
+ //foreach (var num in numberList)
+ //{
+ // numbers.OnNext(num);
+ //}
+ //numbers.OnCompleted();
- //Num sum = call.Result;
+ //Num sum = call.Result;
- //DivReply result = stub.Div(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numberList.Count }.Build());
- }
- }
+ //DivReply result = stub.Div(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numberList.Count }.Build());
+ }
+ }
}
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index ed26fef4a9..45cbacfeb0 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -21,14 +21,10 @@ Gem::Specification.new do |s|
s.require_paths = ['lib']
s.platform = Gem::Platform::RUBY
- s.add_dependency 'faraday', '~> 0.9'
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
s.add_dependency 'googleauth', '~> 0.1'
s.add_dependency 'logging', '~> 1.8'
- s.add_dependency 'jwt', '~> 1.2.1'
s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests
- s.add_dependency 'multi_json', '1.10.1'
- s.add_dependency 'signet', '~> 0.6.0'
s.add_dependency 'xray', '~> 1.1'
s.add_development_dependency 'bundler', '~> 1.7'
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index f234984eec..01328d4a5b 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -39,6 +39,25 @@ module GRPC
# Default deadline is 5 seconds.
DEFAULT_DEADLINE = 5
+ # setup_channel is used by #initialize to constuct a channel from its
+ # arguments.
+ def self.setup_channel(alt_chan, host, creds, **kw)
+ unless alt_chan.nil?
+ fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel)
+ return alt_chan
+ end
+ return Core::Channel.new(host, kw) if creds.nil?
+ fail(TypeError, '!Credentials') unless creds.is_a?(Core::Credentials)
+ Core::Channel.new(host, kw, creds)
+ end
+
+ # check_update_metadata is used by #initialize verify that it's a Proc.
+ def self.check_update_metadata(update_metadata)
+ return update_metadata if update_metadata.nil?
+ fail(TypeError, '!is_a?Proc') unless update_metadata.is_a?(Proc)
+ update_metadata
+ end
+
# Creates a new ClientStub.
#
# Minimally, a stub is created with the just the host of the gRPC service
@@ -73,40 +92,17 @@ module GRPC
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
- channel_override:nil,
+ channel_override: nil,
deadline: DEFAULT_DEADLINE,
creds: nil,
update_metadata: nil,
**kw)
- unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
+ fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue)
@queue = q
-
- # set the channel instance
- if !channel_override.nil?
- ch = channel_override
- fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel
- else
- if creds.nil?
- ch = Core::Channel.new(host, kw)
- elsif !creds.is_a?(Core::Credentials)
- fail(ArgumentError, 'not a Credentials')
- else
- ch = Core::Channel.new(host, kw, creds)
- end
- end
- @ch = ch
-
- @update_metadata = nil
- unless update_metadata.nil?
- unless update_metadata.is_a? Proc
- fail(ArgumentError, 'update_metadata is not a Proc')
- end
- @update_metadata = update_metadata
- end
-
- @host = host
+ @ch = ClientStub.setup_channel(channel_override, host, creds, **kw)
+ @update_metadata = ClientStub.check_update_metadata(update_metadata)
+ alt_host = kw[Core::Channel::SSL_TARGET]
+ @host = alt_host.nil? ? host : alt_host
@deadline = deadline
end
@@ -400,12 +396,7 @@ module GRPC
# @param deadline [TimeConst]
def new_active_call(ch, marshal, unmarshal, deadline = nil)
absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
- # It should be OK to to pass the hostname:port to create_call, but at
- # the moment this fails a security check. This will be corrected.
- #
- # TODO: # remove this after create_call is updated
- host = @host.split(':')[0]
- call = @ch.create_call(ch, host, absolute_deadline)
+ call = @ch.create_call(ch, @host, absolute_deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
started: false)
end
diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py
index f16682862c..9cf3c624c0 100644
--- a/test/compiler/python_plugin_test.py
+++ b/test/compiler/python_plugin_test.py
@@ -32,8 +32,10 @@ import contextlib
import errno
import itertools
import os
+import shutil
import subprocess
import sys
+import tempfile
import time
import unittest
@@ -55,8 +57,8 @@ DOES_NOT_MATTER_DELAY = 0
NO_DELAY = 0
LONG_DELAY = 1
-# Assigned in __main__.
-_build_mode = None
+# Build mode environment variable set by tools/run_tests/run_tests.py.
+_build_mode = os.environ['CONFIG']
class _ServicerMethods(object):
@@ -227,24 +229,26 @@ class PythonPluginTest(unittest.TestCase):
protoc_command = 'protoc'
# Ensure that the output directory exists.
- outdir = '../../gens/test/compiler/python'
- try:
- os.makedirs(outdir)
- except OSError as exception:
- if exception.errno != errno.EEXIST:
- raise
+ self.outdir = tempfile.mkdtemp()
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
'-I %s' % os.path.dirname(test_proto_filename),
- '--python_out=%s' % outdir,
- '--python-grpc_out=%s' % outdir,
+ '--python_out=%s' % self.outdir,
+ '--python-grpc_out=%s' % self.outdir,
os.path.basename(test_proto_filename),
]
subprocess.call(' '.join(cmd), shell=True)
- sys.path.append(outdir)
+ sys.path.append(self.outdir)
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.outdir)
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ raise
# TODO(atash): Figure out which of theses tests is hanging flakily with small
# probability.
@@ -296,6 +300,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testUnaryCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
@@ -325,6 +331,8 @@ class PythonPluginTest(unittest.TestCase):
expected_response, response = check
self.assertEqual(expected_response, response)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testStreamingOutputCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
@@ -335,6 +343,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ExpirationError):
list(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testStreamingOutputCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
@@ -359,6 +369,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ServicerError):
next(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testStreamingInputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
@@ -426,6 +438,8 @@ class PythonPluginTest(unittest.TestCase):
expected_response, response = check
self.assertEqual(expected_response, response)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testFullDuplexCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = FullDuplexRequest(test_pb2)
@@ -436,6 +450,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ExpirationError):
list(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testFullDuplexCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
@@ -459,6 +475,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ServicerError):
next(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testHalfDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
@@ -502,14 +520,4 @@ class PythonPluginTest(unittest.TestCase):
if __name__ == '__main__':
os.chdir(os.path.dirname(sys.argv[0]))
- parser = argparse.ArgumentParser(
- description='Run Python compiler plugin test.')
- parser.add_argument(
- '--build_mode', dest='build_mode', type=str, default='dbg',
- help='The build mode of the targets to test, e.g. "dbg", "opt", "asan", '
- 'etc.')
- parser.add_argument('--port', dest='port', type=int, default=0)
- args, remainder = parser.parse_known_args()
- _build_mode = args.build_mode
- sys.argv[1:] = remainder
unittest.main()
diff --git a/tools/run_tests/python_tests.json b/tools/run_tests/python_tests.json
index 9e5b1365e6..4b43ee8357 100755
--- a/tools/run_tests/python_tests.json
+++ b/tools/run_tests/python_tests.json
@@ -1,18 +1,50 @@
[
- "grpc._adapter._blocking_invocation_inline_service_test",
- "grpc._adapter._c_test",
- "grpc._adapter._event_invocation_synchronous_event_service_test",
- "grpc._adapter._future_invocation_asynchronous_event_service_test",
- "grpc._adapter._links_test",
- "grpc._adapter._lonely_rear_link_test",
- "grpc._adapter._low_test",
- "grpc.early_adopter.implementations_test",
- "grpc.framework.assembly.implementations_test",
- "grpc.framework.base.packets.implementations_test",
- "grpc.framework.face.blocking_invocation_inline_service_test",
- "grpc.framework.face.event_invocation_synchronous_event_service_test",
- "grpc.framework.face.future_invocation_asynchronous_event_service_test",
- "grpc.framework.foundation._later_test",
- "grpc.framework.foundation._logging_pool_test"
+ {
+ "file": "test/compiler/python_plugin_test.py"
+ },
+ {
+ "module": "grpc._adapter._blocking_invocation_inline_service_test"
+ },
+ {
+ "module": "grpc._adapter._c_test"
+ },
+ {
+ "module": "grpc._adapter._event_invocation_synchronous_event_service_test"
+ },
+ {
+ "module": "grpc._adapter._future_invocation_asynchronous_event_service_test"
+ },
+ {
+ "module": "grpc._adapter._links_test"
+ },
+ {
+ "module": "grpc._adapter._lonely_rear_link_test"
+ },
+ {
+ "module": "grpc._adapter._low_test"
+ },
+ {
+ "module": "grpc.early_adopter.implementations_test"
+ },
+ {
+ "module": "grpc.framework.assembly.implementations_test"
+ },
+ {
+ "module": "grpc.framework.base.packets.implementations_test"
+ },
+ {
+ "module": "grpc.framework.face.blocking_invocation_inline_service_test"
+ },
+ {
+ "module": "grpc.framework.face.event_invocation_synchronous_event_service_test"
+ },
+ {
+ "module": "grpc.framework.face.future_invocation_asynchronous_event_service_test"
+ },
+ {
+ "module": "grpc.framework.foundation._later_test"
+ },
+ {
+ "module": "grpc.framework.foundation._logging_pool_test"
+ }
]
-
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 403862b0a0..fa1497aee4 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -36,4 +36,4 @@ cd $(dirname $0)/../..
root=`pwd`
export LD_LIBRARY_PATH=$root/libs/opt
source python2.7_virtual_environment/bin/activate
-python2.7 -B -m $*
+python2.7 -B $*
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index baad727e51..aee19cdc42 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -170,11 +170,16 @@ class PythonLanguage(object):
self._tests = json.load(f)
def test_specs(self, config, travis):
- return [config.job_spec(['tools/run_tests/run_python.sh', test], None)
- for test in self._tests]
+ modules = [config.job_spec(['tools/run_tests/run_python.sh', '-m',
+ test['module']], None)
+ for test in self._tests if 'module' in test]
+ files = [config.job_spec(['tools/run_tests/run_python.sh',
+ test['file']], None)
+ for test in self._tests if 'file' in test]
+ return files + modules
def make_targets(self):
- return ['static_c']
+ return ['static_c', 'grpc_python_plugin']
def build_steps(self):
return [['tools/run_tests/build_python.sh']]