diff options
Diffstat (limited to 'src')
30 files changed, 1093 insertions, 932 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index aa764cbb33..b73b000a1c 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -127,6 +127,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) { "class ServerContext;\n"; if (HasUnaryCalls(file)) { temp.append( + "template <class OutMessage> class ClientAsyncResponseReader;\n"); + temp.append( "template <class OutMessage> class ServerAsyncResponseWriter;\n"); } if (HasClientOnlyStreaming(file)) { @@ -160,7 +162,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) { } std::string GetSourceIncludes() { - return "#include <grpc++/channel_interface.h>\n" + return "#include <grpc++/async_unary_call.h>\n" + "#include <grpc++/channel_interface.h>\n" "#include <grpc++/impl/client_unary_call.h>\n" "#include <grpc++/impl/rpc_method.h>\n" "#include <grpc++/impl/rpc_service_method.h>\n" @@ -181,9 +184,9 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, "::grpc::Status $Method$(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response);\n"); printer->Print(*vars, - "void $Method$(::grpc::ClientContext* context, " - "const $Request$& request, $Response$* response, " - "::grpc::Status* status, " + "::grpc::ClientAsyncResponseReader< $Response$>* " + "$Method$(::grpc::ClientContext* context, " + "const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag);\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, @@ -378,14 +381,15 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "context, request, response);\n" "}\n\n"); printer->Print(*vars, - "void $Service$::Stub::$Method$(" - "::grpc::ClientContext* context, " - "const $Request$& request, $Response$* response, ::grpc::Status* status, " + "::grpc::ClientAsyncResponseReader< $Response$>* " + "$Service$::Stub::$Method$(::grpc::ClientContext* context, " + "const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " ::grpc::AsyncUnaryCall(channel()," + " return new ClientAsyncResponseReader< $Response$>(" + "channel(), cq, " "::grpc::RpcMethod($Service$_method_names[$Idx$]), " - "context, request, response, status, cq, tag);\n" + "context, request, tag);\n" "}\n\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 9fb2819506..a1c3938a33 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -93,7 +93,7 @@ static int multipoll_with_epoll_pollset_maybe_work( /* If you want to ignore epoll's ability to sanely handle parallel pollers, * for a more apples-to-apples performance comparison with poll, add a - * if (pollset->counter == 0) { return 0 } + * if (pollset->counter != 0) { return 0; } * here. */ diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index b2e0fd215a..60e82d9dfa 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -313,7 +313,7 @@ static void oauth2_token_fetcher_destroy(grpc_credentials *creds) { grpc_mdelem_unref(c->access_token_md); } gpr_mu_destroy(&c->mu); - grpc_mdctx_orphan(c->md_ctx); + grpc_mdctx_unref(c->md_ctx); gpr_free(c); } @@ -587,7 +587,7 @@ static void fake_oauth2_destroy(grpc_credentials *creds) { if (c->access_token_md != NULL) { grpc_mdelem_unref(c->access_token_md); } - grpc_mdctx_orphan(c->md_ctx); + grpc_mdctx_unref(c->md_ctx); gpr_free(c); } @@ -897,7 +897,7 @@ static void iam_destroy(grpc_credentials *creds) { grpc_iam_credentials *c = (grpc_iam_credentials *)creds; grpc_mdelem_unref(c->token_md); grpc_mdelem_unref(c->authority_selector_md); - grpc_mdctx_orphan(c->md_ctx); + grpc_mdctx_unref(c->md_ctx); gpr_free(c); } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 89a6ba63b2..40caa93868 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -313,7 +313,6 @@ static void set_status_code(grpc_call *call, status_source source, } if (flush && !grpc_bbq_empty(&call->incoming_queue)) { - gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status); grpc_bbq_flush(&call->incoming_queue); } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index e308c60410..e38734c6a4 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -146,7 +146,7 @@ static void destroy_channel(void *p, int ok) { grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); - grpc_mdctx_orphan(channel->metadata_context); + grpc_mdctx_unref(channel->metadata_context); gpr_free(channel); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 551ae27e61..5b2d0a5e5b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -336,11 +336,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ -static void unref_transport(transport *t) { +static void destruct_transport(transport *t) { size_t i; - if (!gpr_unref(&t->refs)) return; - gpr_mu_lock(&t->mu); GPR_ASSERT(t->ep == NULL); @@ -380,9 +378,16 @@ static void unref_transport(transport *t) { grpc_sopb_destroy(&t->nuke_later_sopb); + grpc_mdctx_unref(t->metadata_context); + gpr_free(t); } +static void unref_transport(transport *t) { + if (!gpr_unref(&t->refs)) return; + destruct_transport(t); +} + static void ref_transport(transport *t) { gpr_ref(&t->refs); } static void init_transport(transport *t, grpc_transport_setup_callback setup, @@ -401,6 +406,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, gpr_ref_init(&t->refs, 2); gpr_mu_init(&t->mu); gpr_cv_init(&t->cv); + grpc_mdctx_ref(mdctx); t->metadata_context = mdctx; t->str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); @@ -1025,8 +1031,6 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, int had_outgoing; char buffer[GPR_LTOA_MIN_BUFSIZE]; - gpr_log(GPR_DEBUG, "cancel %d", id); - if (s) { /* clear out any unreported input & output: nobody cares anymore */ had_outgoing = s->outgoing_sopb.nops != 0; diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 3dc23e7de2..1c15716fad 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -79,7 +79,7 @@ typedef struct internal_metadata { struct grpc_mdctx { gpr_uint32 hash_seed; - int orphaned; + int refs; gpr_mu mu; @@ -114,7 +114,7 @@ static void unlock(grpc_mdctx *ctx) { mdelems on every unlock (instead of the usual 'I'm too loaded' trigger case), since otherwise we can be stuck waiting for a garbage collection that will never happen. */ - if (ctx->orphaned) { + if (ctx->refs == 0) { /* uncomment if you're having trouble diagnosing an mdelem leak to make things clearer (slows down destruction a lot, however) */ /* gc_mdtab(ctx); */ @@ -139,7 +139,7 @@ static void ref_md(internal_metadata *md) { grpc_mdctx *grpc_mdctx_create_with_seed(gpr_uint32 seed) { grpc_mdctx *ctx = gpr_malloc(sizeof(grpc_mdctx)); - ctx->orphaned = 0; + ctx->refs = 1; ctx->hash_seed = seed; gpr_mu_init(&ctx->mu); ctx->strtab = gpr_malloc(sizeof(internal_string *) * INITIAL_STRTAB_CAPACITY); @@ -197,10 +197,17 @@ static void metadata_context_destroy(grpc_mdctx *ctx) { gpr_free(ctx); } -void grpc_mdctx_orphan(grpc_mdctx *ctx) { +void grpc_mdctx_ref(grpc_mdctx *ctx) { lock(ctx); - GPR_ASSERT(!ctx->orphaned); - ctx->orphaned = 1; + GPR_ASSERT(ctx->refs > 0); + ctx->refs++; + unlock(ctx); +} + +void grpc_mdctx_unref(grpc_mdctx *ctx) { + lock(ctx); + GPR_ASSERT(ctx->refs > 0); + ctx->refs--; unlock(ctx); } diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h index 430cae6847..7a56e34690 100644 --- a/src/core/transport/metadata.h +++ b/src/core/transport/metadata.h @@ -84,7 +84,8 @@ struct grpc_mdelem { /* Create/orphan a metadata context */ grpc_mdctx *grpc_mdctx_create(void); grpc_mdctx *grpc_mdctx_create_with_seed(gpr_uint32 seed); -void grpc_mdctx_orphan(grpc_mdctx *mdctx); +void grpc_mdctx_ref(grpc_mdctx *mdctx); +void grpc_mdctx_unref(grpc_mdctx *mdctx); /* Test only accessors to internal state - only for testing this code - do not rely on it outside of metadata_test.c */ diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index a5ef989946..684b3cbadb 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -61,30 +61,4 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, return status; } -class ClientAsyncRequest final : public CallOpBuffer { - public: - bool FinalizeResult(void **tag, bool *status) override { - bool r = CallOpBuffer::FinalizeResult(tag, status); - delete this; - return r; - } -}; - -void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, - ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result, Status *status, - CompletionQueue *cq, void *tag) { - ClientAsyncRequest *buf = new ClientAsyncRequest; - buf->Reset(tag); - Call call(channel->CreateCall(method, context, cq)); - buf->AddSendInitialMetadata(context); - buf->AddSendMessage(request); - buf->AddRecvInitialMetadata(context); - buf->AddRecvMessage(result); - buf->AddClientSendClose(); - buf->AddClientRecvStatus(context, status); - call.PerformOps(buf); -} - } // namespace grpc diff --git a/src/csharp/GrpcApi/MathGrpc.cs b/src/csharp/GrpcApi/MathGrpc.cs index caea1608ec..44e704e496 100644 --- a/src/csharp/GrpcApi/MathGrpc.cs +++ b/src/csharp/GrpcApi/MathGrpc.cs @@ -81,7 +81,7 @@ namespace math Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)); - Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)); + void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)); ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken)); @@ -109,10 +109,10 @@ namespace math return Calls.AsyncUnaryCall(call, request, token); } - public Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)) + public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)) { var call = new Google.GRPC.Core.Call<FibArgs, Num>(fibMethod, channel); - return Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + Calls.AsyncServerStreamingCall(call, request, responseObserver, token); } public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken)) diff --git a/src/csharp/GrpcApi/TestServiceGrpc.cs b/src/csharp/GrpcApi/TestServiceGrpc.cs index 6534a44ef4..64d5c09563 100644 --- a/src/csharp/GrpcApi/TestServiceGrpc.cs +++ b/src/csharp/GrpcApi/TestServiceGrpc.cs @@ -99,7 +99,7 @@ namespace grpc.testing Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)); - Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)); + void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)); ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)); @@ -141,9 +141,9 @@ namespace grpc.testing return Calls.AsyncUnaryCall(call, request, token); } - public Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) { + public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) { var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(streamingOutputCallMethod, channel); - return Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + Calls.AsyncServerStreamingCall(call, request, responseObserver, token); } public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) diff --git a/src/csharp/GrpcApiTests/MathClientServerTests.cs b/src/csharp/GrpcApiTests/MathClientServerTests.cs index bd298b0932..9056142097 100644 --- a/src/csharp/GrpcApiTests/MathClientServerTests.cs +++ b/src/csharp/GrpcApiTests/MathClientServerTests.cs @@ -64,6 +64,15 @@ namespace math.Tests client = MathGrpc.NewStub(channel); } + [TestFixtureTearDown] + public void Cleanup() + { + channel.Dispose(); + + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } + [Test] public void Div1() { @@ -136,15 +145,6 @@ namespace math.Tests CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient)); CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder)); } - - [TestFixtureTearDown] - public void Cleanup() - { - channel.Dispose(); - - server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); - } } } diff --git a/src/csharp/GrpcCore/Calls.cs b/src/csharp/GrpcCore/Calls.cs index d89d9a16f9..e5ddd879d6 100644 --- a/src/csharp/GrpcCore/Calls.cs +++ b/src/csharp/GrpcCore/Calls.cs @@ -47,50 +47,42 @@ namespace Google.GRPC.Core { public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) { - //TODO: implement this in real synchronous style once new GRPC C core API is available. - return AsyncUnaryCall(call, req, token).Result; + //TODO: implement this in real synchronous style. + try { + return AsyncUnaryCall(call, req, token).Result; + } catch(AggregateException ae) { + foreach (var e in ae.InnerExceptions) + { + if (e is RpcException) + { + throw e; + } + } + throw; + } } public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - - await asyncCall.WriteAsync(req); - await asyncCall.WritesCompletedAsync(); - - TResponse response = await asyncCall.ReadAsync(); - - Status status = await asyncCall.Finished; - - if (status.StatusCode != StatusCode.GRPC_STATUS_OK) - { - throw new RpcException(status); - } - return response; + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + return await asyncCall.UnaryCallAsync(req); } - public static async Task AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token) + public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - asyncCall.StartReadingToStream(outputs); - - await asyncCall.WriteAsync(req); - await asyncCall.WritesCompletedAsync(); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + asyncCall.StartServerStreamingCall(req, outputs); } public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - - var task = asyncCall.ReadAsync(); - var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + var task = asyncCall.ClientStreamingCallAsync(); + var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs); } @@ -102,12 +94,10 @@ namespace Google.GRPC.Core public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); - asyncCall.StartReadingToStream(outputs); - var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall); - return inputs; + asyncCall.StartDuplexStreamingCall(outputs); + return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); } private static CompletionQueueSafeHandle GetCompletionQueue() { diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index 34b9f6dfb8..ee76b742ce 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -47,21 +47,21 @@ <Compile Include="Internal\ChannelSafeHandle.cs" /> <Compile Include="Internal\CompletionQueueSafeHandle.cs" /> <Compile Include="Internal\Enums.cs" /> - <Compile Include="Internal\Event.cs" /> <Compile Include="Internal\SafeHandleZeroIsInvalid.cs" /> <Compile Include="Internal\Timespec.cs" /> <Compile Include="Internal\GrpcThreadPool.cs" /> <Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Internal\ServerSafeHandle.cs" /> - <Compile Include="Internal\StreamingInputObserver.cs" /> <Compile Include="Method.cs" /> <Compile Include="ServerCalls.cs" /> <Compile Include="ServerCallHandler.cs" /> - <Compile Include="Internal\ServerWritingObserver.cs" /> <Compile Include="Marshaller.cs" /> <Compile Include="ServerServiceDefinition.cs" /> <Compile Include="Utils\RecordingObserver.cs" /> <Compile Include="Utils\RecordingQueue.cs" /> + <Compile Include="Internal\ClientStreamingInputObserver.cs" /> + <Compile Include="Internal\ServerStreamingOutputObserver.cs" /> + <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/GrpcCore/GrpcEnvironment.cs b/src/csharp/GrpcCore/GrpcEnvironment.cs index c4f030267d..55a6cac8f6 100644 --- a/src/csharp/GrpcCore/GrpcEnvironment.cs +++ b/src/csharp/GrpcCore/GrpcEnvironment.cs @@ -42,7 +42,7 @@ namespace Google.GRPC.Core /// </summary> public class GrpcEnvironment { - const int THREAD_POOL_SIZE = 1; + const int THREAD_POOL_SIZE = 4; [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_init(); diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index d5f3239e1e..ce0ba30d53 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -42,171 +42,177 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { /// <summary> - /// Listener for call events that can be delivered from a completion queue. + /// Handles native call lifecycle and provides convenience methods. /// </summary> - internal interface ICallEventListener { - - void OnClientMetadata(); - - void OnRead(byte[] payload); - - void OnWriteAccepted(GRPCOpError error); - - void OnFinishAccepted(GRPCOpError error); - - // ignore the status on server - void OnFinished(Status status); - } - - /// <summary> - /// Handle native call lifecycle and provides convenience methods. - /// </summary> - internal class AsyncCall<TWrite, TRead>: ICallEventListener, IDisposable + internal class AsyncCall<TWrite, TRead> { readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; - // TODO: make sure the delegate doesn't get garbage collected while - // native callbacks are in the completion queue. - readonly EventCallbackDelegate callbackHandler; + readonly CompletionCallbackDelegate unaryResponseHandler; + readonly CompletionCallbackDelegate finishedHandler; + readonly CompletionCallbackDelegate writeFinishedHandler; + readonly CompletionCallbackDelegate readFinishedHandler; + readonly CompletionCallbackDelegate halfclosedHandler; + readonly CompletionCallbackDelegate finishedServersideHandler; object myLock = new object(); - bool disposed; + GCHandle gchandle; CallSafeHandle call; + bool disposed; + + bool server; bool started; bool errorOccured; - bool cancelRequested; + bool readingDone; bool halfcloseRequested; bool halfclosed; - bool doneWithReading; - Nullable<Status> finishedStatus; + bool finished; + // Completion of a pending write if not null. TaskCompletionSource<object> writeTcs; + + // Completion of a pending read if not null. TaskCompletionSource<TRead> readTcs; - TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>(); - TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>(); + // Completion of a pending halfclose if not null. + TaskCompletionSource<object> halfcloseTcs; + + // Completion of a pending unary response if not null. + TaskCompletionSource<TRead> unaryResponseTcs; + + // Set after status is received on client. Only used for server streaming and duplex streaming calls. + Nullable<Status> finishedStatus; + TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); + + // For streaming, the reads will be delivered to this observer. IObserver<TRead> readObserver; public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = serializer; this.deserializer = deserializer; - this.callbackHandler = HandleEvent; + this.unaryResponseHandler = HandleUnaryResponse; + this.finishedHandler = HandleFinished; + this.writeFinishedHandler = HandleWriteFinished; + this.readFinishedHandler = HandleReadFinished; + this.halfclosedHandler = HandleHalfclosed; + this.finishedServersideHandler = HandleFinishedServerside; } - public Task WriteAsync(TWrite msg) + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { - return StartWrite(msg, false).Task; + InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); } - public Task WritesCompletedAsync() + public void InitializeServer(CallSafeHandle call) { - WritesDone(); - return halfcloseTcs.Task; + InitializeInternal(call, true); } - public Task WriteStatusAsync(Status status) + public Task<TRead> UnaryCallAsync(TWrite msg) { - WriteStatus(status); - return halfcloseTcs.Task; - } + lock (myLock) + { + started = true; + halfcloseRequested = true; + readingDone = true; - public Task<TRead> ReadAsync() - { - return StartRead().Task; - } + // TODO: handle serialization error... + byte[] payload = serializer(msg); - public Task Halfclosed - { - get - { - return halfcloseTcs.Task; + unaryResponseTcs = new TaskCompletionSource<TRead>(); + call.StartUnary(payload, unaryResponseHandler); + + return unaryResponseTcs.Task; } } - public Task<Status> Finished + public Task<TRead> ClientStreamingCallAsync() { - get + lock (myLock) { - return finishedTcs.Task; + started = true; + readingDone = true; + + unaryResponseTcs = new TaskCompletionSource<TRead>(); + call.StartClientStreaming(unaryResponseHandler); + + return unaryResponseTcs.Task; } } - /// <summary> - /// Initiates reading to given observer. - /// </summary> - public void StartReadingToStream(IObserver<TRead> readObserver) { + public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver) + { lock (myLock) { - CheckStarted(); - if (this.readObserver != null) - { - throw new InvalidOperationException("Already registered an observer."); - } + started = true; + halfcloseRequested = true; + this.readObserver = readObserver; - StartRead(); - } - } - public void Initialize(Channel channel, String methodName) { - lock (myLock) - { - this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture); + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + call.StartServerStreaming(payload, finishedHandler); + + ReceiveMessageAsync(); } } - public void InitializeServer(CallSafeHandle call) + public void StartDuplexStreamingCall(IObserver<TRead> readObserver) { - lock(myLock) + lock (myLock) { - this.call = call; + started = true; + + this.readObserver = readObserver; + + call.StartDuplexStreaming(finishedHandler); + + ReceiveMessageAsync(); } } - // Client only - public void Start(bool buffered, CompletionQueueSafeHandle cq) + public Task ServerSideUnaryRequestCallAsync() { lock (myLock) { - if (started) - { - throw new InvalidOperationException("Already started."); - } - - call.Invoke(cq, buffered, callbackHandler, callbackHandler); started = true; + call.StartServerSide(finishedServersideHandler); + return finishedServersideTcs.Task; } } - // Server only - public void Accept(CompletionQueueSafeHandle cq) + public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver) { lock (myLock) { - if (started) + started = true; + call.StartServerSide(finishedServersideHandler); + + if (this.readObserver != null) { - throw new InvalidOperationException("Already started."); + throw new InvalidOperationException("Already registered an observer."); } + this.readObserver = readObserver; + ReceiveMessageAsync(); - call.ServerAccept(cq, callbackHandler); - call.ServerEndInitialMetadata(0); - started = true; + return finishedServersideTcs.Task; } } - public TaskCompletionSource<object> StartWrite(TWrite msg, bool buffered) + public Task SendMessageAsync(TWrite msg) { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } @@ -219,63 +225,62 @@ namespace Google.GRPC.Core.Internal // TODO: wrap serialization... byte[] payload = serializer(msg); - call.StartWrite(payload, buffered, callbackHandler); + call.StartSendMessage(payload, writeFinishedHandler); writeTcs = new TaskCompletionSource<object>(); - return writeTcs; + return writeTcs.Task; } } - // client only - public void WritesDone() + public Task SendCloseFromClientAsync() { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } - call.WritesDone(callbackHandler); + call.StartSendCloseFromClient(halfclosedHandler); + halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource<object>(); + return halfcloseTcs.Task; } } - // server only - public void WriteStatus(Status status) + public Task SendStatusFromServerAsync(Status status) { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } - call.StartWriteStatus(status, callbackHandler); + call.StartSendStatusFromServer(status, halfclosedHandler); halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource<object>(); + return halfcloseTcs.Task; } } - public TaskCompletionSource<TRead> StartRead() + public Task<TRead> ReceiveMessageAsync() { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - // TODO: add check for not cancelled? - - if (doneWithReading) + if (readingDone) { throw new InvalidOperationException("Already read the last message."); } @@ -285,10 +290,10 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Only one read can be pending at a time"); } - call.StartRead(callbackHandler); + call.StartReceiveMessage(readFinishedHandler); readTcs = new TaskCompletionSource<TRead>(); - return readTcs; + return readTcs.Task; } } @@ -296,9 +301,8 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); - cancelRequested = true; } // grpc_call_cancel is threadsafe @@ -309,218 +313,304 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); - cancelRequested = true; } // grpc_call_cancel_with_status is threadsafe call.CancelWithStatus(status); } - public void OnClientMetadata() + private void InitializeInternal(CallSafeHandle call, bool server) { - // TODO: implement.... + lock (myLock) + { + // Make sure this object and the delegated held by it will not be garbage collected + // before we release this handle. + gchandle = GCHandle.Alloc(this); + this.call = call; + this.server = server; + } } - public void OnRead(byte[] payload) + private void CheckStarted() { - TaskCompletionSource<TRead> oldTcs = null; - IObserver<TRead> observer = null; - lock (myLock) + if (!started) { - oldTcs = readTcs; - readTcs = null; - if (payload == null) - { - doneWithReading = true; - } - observer = readObserver; + throw new InvalidOperationException("Call not started"); } + } - // TODO: wrap deserialization... - TRead msg = payload != null ? deserializer(payload) : default(TRead); - - oldTcs.SetResult(msg); - - // TODO: make sure we deliver reads in the right order. + private void CheckNotDisposed() + { + if (disposed) + { + throw new InvalidOperationException("Call has already been disposed."); + } + } - if (observer != null) + private void CheckNoError() + { + if (errorOccured) { - if (payload != null) - { - // TODO: wrap to handle exceptions - observer.OnNext(msg); + throw new InvalidOperationException("Error occured when processing call."); + } + } - // start a new read - StartRead(); - } - else + private bool ReleaseResourcesIfPossible() + { + if (!disposed && call != null) + { + if (halfclosed && readingDone && finished) { - // TODO: wrap to handle exceptions; - observer.OnCompleted(); + ReleaseResources(); + return true; } + } + return false; + } + private void ReleaseResources() + { + if (call != null) { + call.Dispose(); } + gchandle.Free(); + disposed = true; } - public void OnWriteAccepted(GRPCOpError error) + private void CompleteStreamObserver(Status status) { - TaskCompletionSource<object> oldTcs = null; - lock (myLock) + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { - UpdateErrorOccured(error); - oldTcs = writeTcs; - writeTcs = null; + // TODO: wrap to handle exceptions; + readObserver.OnError(new RpcException(status)); + } else { + // TODO: wrap to handle exceptions; + readObserver.OnCompleted(); } + } - if (errorOccured) + /// <summary> + /// Handler for unary response completion. + /// </summary> + private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + { + try { - // TODO: use the right type of exception... - oldTcs.SetException(new Exception("Write failed")); - } - else + TaskCompletionSource<TRead> tcs; + lock(myLock) + { + finished = true; + halfclosed = true; + tcs = unaryResponseTcs; + + ReleaseResourcesIfPossible(); + } + + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) + { + tcs.SetException(new RpcException( + new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.") + )); + return; + } + + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) + { + tcs.SetException(new RpcException(status)); + return; + } + + // TODO: handle deserialize error... + var msg = deserializer(ctx.GetReceivedMessage()); + tcs.SetResult(msg); + } + catch(Exception e) { - // TODO: where does the continuation run? - oldTcs.SetResult(null); + Console.WriteLine("Caught exception in a native handler: " + e); } } - public void OnFinishAccepted(GRPCOpError error) + private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) { - lock (myLock) + try { - UpdateErrorOccured(error); - halfclosed = true; - } + TaskCompletionSource<object> oldTcs = null; + lock (myLock) + { + oldTcs = writeTcs; + writeTcs = null; + } - if (errorOccured) - { - halfcloseTcs.SetException(new Exception("Halfclose failed")); + if (errorOccured) + { + // TODO: use the right type of exception... + oldTcs.SetException(new Exception("Write failed")); + } + else + { + // TODO: where does the continuation run? + oldTcs.SetResult(null); + } } - else + catch(Exception e) { - halfcloseTcs.SetResult(null); + Console.WriteLine("Caught exception in a native handler: " + e); } - } - public void OnFinished(Status status) + private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) { - lock (myLock) + try { - finishedStatus = status; + lock (myLock) + { + halfclosed = true; - DisposeResourcesIfNeeded(); - } - finishedTcs.SetResult(status); + ReleaseResourcesIfPossible(); + } - } + if (error != GRPCOpError.GRPC_OP_OK) + { + halfcloseTcs.SetException(new Exception("Halfclose failed")); - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); + } + else + { + halfcloseTcs.SetResult(null); + } + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } } - protected virtual void Dispose(bool disposing) + private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) { - if (!disposed) + try { - if (disposing) + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var payload = ctx.GetReceivedMessage(); + + TaskCompletionSource<TRead> oldTcs = null; + IObserver<TRead> observer = null; + + Nullable<Status> status = null; + + lock (myLock) { - if (call != null) + oldTcs = readTcs; + readTcs = null; + if (payload == null) { - call.Dispose(); + readingDone = true; } + observer = readObserver; + status = finishedStatus; } - disposed = true; - } - } - private void UpdateErrorOccured(GRPCOpError error) - { - if (error == GRPCOpError.GRPC_OP_ERROR) - { - errorOccured = true; - } - } + // TODO: wrap deserialization... + TRead msg = payload != null ? deserializer(payload) : default(TRead); - private void CheckStarted() - { - if (!started) - { - throw new InvalidOperationException("Call not started"); - } - } + oldTcs.SetResult(msg); - private void CheckNoError() - { - if (errorOccured) + // TODO: make sure we deliver reads in the right order. + + if (observer != null) + { + if (payload != null) + { + // TODO: wrap to handle exceptions + observer.OnNext(msg); + + // start a new read + ReceiveMessageAsync(); + } + else + { + if (!server) + { + if (status.HasValue) + { + CompleteStreamObserver(status.Value); + } + } + else + { + // TODO: wrap to handle exceptions.. + observer.OnCompleted(); + } + // TODO: completeStreamObserver serverside... + } + } + } + catch(Exception e) { - throw new InvalidOperationException("Error occured when processing call."); + Console.WriteLine("Caught exception in a native handler: " + e); } } - private void CheckNotFinished() + private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) { - if (finishedStatus.HasValue) + try { - throw new InvalidOperationException("Already finished."); - } - } + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var status = ctx.GetReceivedStatus(); - private void CheckCancelNotRequested() - { - if (cancelRequested) + bool wasReadingDone; + + lock (myLock) + { + finished = true; + finishedStatus = status; + + wasReadingDone = readingDone; + + ReleaseResourcesIfPossible(); + } + + if (wasReadingDone) { + CompleteStreamObserver(status); + } + + } + catch(Exception e) { - throw new InvalidOperationException("Cancel has been requested."); + Console.WriteLine("Caught exception in a native handler: " + e); } } - private void DisposeResourcesIfNeeded() + private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) { - if (call != null && started && finishedStatus.HasValue) + try { - // TODO: should we also wait for all the pending events to finish? - - call.Dispose(); - } - } + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - private void HandleEvent(IntPtr eventPtr) { - try { - var ev = new EventSafeHandleNotOwned(eventPtr); - switch (ev.GetCompletionType()) + lock(myLock) { - case GRPCCompletionType.GRPC_CLIENT_METADATA_READ: - OnClientMetadata(); - break; - - case GRPCCompletionType.GRPC_READ: - byte[] payload = ev.GetReadData(); - OnRead(payload); - break; + finished = true; - case GRPCCompletionType.GRPC_WRITE_ACCEPTED: - OnWriteAccepted(ev.GetWriteAccepted()); - break; + // TODO: because of the way server calls are implemented, we need to set + // reading done to true here. Should be fixed in the future. + readingDone = true; - case GRPCCompletionType.GRPC_FINISH_ACCEPTED: - OnFinishAccepted(ev.GetFinishAccepted()); - break; + ReleaseResourcesIfPossible(); + } + // TODO: handle error ... - case GRPCCompletionType.GRPC_FINISHED: - OnFinished(ev.GetFinished()); - break; + finishedServersideTcs.SetResult(null); - default: - throw new ArgumentException("Unexpected completion type"); - } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } } -} +}
\ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs new file mode 100644 index 0000000000..ddfd94a3b5 --- /dev/null +++ b/src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs @@ -0,0 +1,96 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Google.GRPC.Core; + +namespace Google.GRPC.Core.Internal +{ + /// <summary> + /// Not owned version of + /// grpcsharp_batch_context + /// </summary> + internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen); + + [DllImport("grpc_csharp_ext.dll")] + static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char* + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + + public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) + { + SetHandle(handle); + } + + public Status GetReceivedStatus() + { + // TODO: can the native method return string directly? + string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this)); + return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details); + } + + public byte[] GetReceivedMessage() + { + IntPtr len = grpcsharp_batch_context_recv_message_length(this); + if (len == new IntPtr(-1)) + { + return null; + } + byte[] data = new byte[(int) len]; + grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length)); + return data; + } + + public CallSafeHandle GetServerRpcNewCall() { + return grpcsharp_batch_context_server_rpc_new_call(this); + } + + public string GetServerRpcNewMethod() { + return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); + } + } +}
\ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs index e9ccd8d5f9..55d66a62ca 100644 --- a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -38,8 +38,8 @@ using Google.GRPC.Core; namespace Google.GRPC.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - internal delegate void EventCallbackDelegate(IntPtr eventPtr); + //TODO: rename the delegate + internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); /// <summary> /// grpc_call from <grpc/grpc.h> @@ -49,142 +49,108 @@ namespace Google.GRPC.Core.Internal const UInt32 GRPC_WRITE_BUFFER_HINT = 1; [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call_old(ChannelSafeHandle channel, string method, string host, Timespec deadline); + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_add_metadata(CallSafeHandle call, IntPtr metadata, UInt32 flags); + static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_invoke_old(CallSafeHandle call, CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, UInt32 flags); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_invoke_old")] - static extern GRPCCallError grpcsharp_call_invoke_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle cq, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate metadataReadCallback, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback, - UInt32 flags); + static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_server_accept_old(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, IntPtr finishedTag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_server_accept_old")] - static extern GRPCCallError grpcsharp_call_server_accept_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback); + static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_server_end_initial_metadata_old(CallSafeHandle call, UInt32 flags); + static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); + static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); + static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_write_status_old(CallSafeHandle call, StatusCode statusCode, string statusMessage, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_status_old")] - static extern GRPCCallError grpcsharp_call_start_write_status_old_CALLBACK(CallSafeHandle call, StatusCode statusCode, string statusMessage, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_writes_done_old(CallSafeHandle call, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_writes_done_old")] - static extern GRPCCallError grpcsharp_call_writes_done_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_read_old(CallSafeHandle call, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_read_old")] - static extern GRPCCallError grpcsharp_call_start_read_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_call_start_write_from_copied_buffer(CallSafeHandle call, - byte[] buffer, UIntPtr length, - IntPtr tag, UInt32 flags); + static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_from_copied_buffer")] - static extern void grpcsharp_call_start_write_from_copied_buffer_CALLBACK(CallSafeHandle call, - byte[] buffer, UIntPtr length, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback, - UInt32 flags); + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - [DllImport("grpc_csharp_ext.dll")] + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_call_destroy(IntPtr call); - private CallSafeHandle() - { - } - - /// <summary> - /// Creates a client call. - /// </summary> - public static CallSafeHandle Create(ChannelSafeHandle channel, string method, string host, Timespec deadline) - { - return grpcsharp_channel_create_call_old(channel, method, host, deadline); - } - - public void Invoke(CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, bool buffered) - { - AssertCallOk(grpcsharp_call_invoke_old(this, cq, metadataReadTag, finishedTag, GetFlags(buffered))); - } - - public void Invoke(CompletionQueueSafeHandle cq, bool buffered, EventCallbackDelegate metadataReadCallback, EventCallbackDelegate finishedCallback) - { - AssertCallOk(grpcsharp_call_invoke_old_CALLBACK(this, cq, metadataReadCallback, finishedCallback, GetFlags(buffered))); - } - public void ServerAccept(CompletionQueueSafeHandle cq, IntPtr finishedTag) + private CallSafeHandle() { - AssertCallOk(grpcsharp_call_server_accept_old(this, cq, finishedTag)); } - public void ServerAccept(CompletionQueueSafeHandle cq, EventCallbackDelegate callback) + public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) { - AssertCallOk(grpcsharp_call_server_accept_old_CALLBACK(this, cq, callback)); + return grpcsharp_channel_create_call(channel, cq, method, host, deadline); } - public void ServerEndInitialMetadata(UInt32 flags) + public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_server_end_initial_metadata_old(this, flags)); + AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void StartWrite(byte[] payload, IntPtr tag, bool buffered) + public void StartClientStreaming(CompletionCallbackDelegate callback) { - grpcsharp_call_start_write_from_copied_buffer(this, payload, new UIntPtr((ulong) payload.Length), tag, GetFlags(buffered)); + AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); } - public void StartWrite(byte[] payload, bool buffered, EventCallbackDelegate callback) + public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) { - grpcsharp_call_start_write_from_copied_buffer_CALLBACK(this, payload, new UIntPtr((ulong) payload.Length), callback, GetFlags(buffered)); + AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void StartWriteStatus(Status status, IntPtr tag) + public void StartDuplexStreaming(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_write_status_old(this, status.StatusCode, status.Detail, tag)); + AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback)); } - public void StartWriteStatus(Status status, EventCallbackDelegate callback) + public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_write_status_old_CALLBACK(this, status.StatusCode, status.Detail, callback)); + AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void WritesDone(IntPtr tag) + public void StartSendCloseFromClient(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_writes_done_old(this, tag)); + AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); } - public void WritesDone(EventCallbackDelegate callback) + public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_writes_done_old_CALLBACK(this, callback)); + AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); } - public void StartRead(IntPtr tag) + public void StartReceiveMessage(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_read_old(this, tag)); + AssertCallOk(grpcsharp_call_recv_message(this, callback)); } - public void StartRead(EventCallbackDelegate callback) + public void StartServerSide(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_read_old_CALLBACK(this, callback)); + AssertCallOk(grpcsharp_call_start_serverside(this, callback)); } public void Cancel() @@ -212,4 +178,4 @@ namespace Google.GRPC.Core.Internal return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; } } -} +}
\ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs index 60837de5e6..4d10a9bdf9 100644 --- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs +++ b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -36,19 +36,20 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { - internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite> + internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite> { readonly AsyncCall<TWrite, TRead> call; - public StreamingInputObserver(AsyncCall<TWrite, TRead> call) + public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call) { this.call = call; } public void OnCompleted() { + // TODO: how bad is the Wait here? - call.WritesCompletedAsync().Wait(); + call.SendCloseFromClientAsync().Wait(); } public void OnError(Exception error) @@ -59,7 +60,7 @@ namespace Google.GRPC.Core.Internal public void OnNext(TWrite value) { // TODO: how bad is the Wait here? - call.WriteAsync(value).Wait(); + call.SendMessageAsync(value).Wait(); } } } diff --git a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs index 666f220b8c..5ea436df19 100644 --- a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs @@ -46,12 +46,6 @@ namespace Google.GRPC.Core.Internal static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create(); [DllImport("grpc_csharp_ext.dll")] - static extern EventSafeHandle grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag, Timespec deadline); - - [DllImport("grpc_csharp_ext.dll")] - static extern EventSafeHandle grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq, Timespec deadline); - - [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq); [DllImport("grpc_csharp_ext.dll")] @@ -69,21 +63,11 @@ namespace Google.GRPC.Core.Internal return grpcsharp_completion_queue_create(); } - public EventSafeHandle Next(Timespec deadline) - { - return grpcsharp_completion_queue_next(this, deadline); - } - public GRPCCompletionType NextWithCallback() { return grpcsharp_completion_queue_next_with_callback(this); } - public EventSafeHandle Pluck(IntPtr tag, Timespec deadline) - { - return grpcsharp_completion_queue_pluck(this, tag, deadline); - } - public void Shutdown() { grpcsharp_completion_queue_shutdown(this); diff --git a/src/csharp/GrpcCore/Internal/Event.cs b/src/csharp/GrpcCore/Internal/Event.cs deleted file mode 100644 index 6116e0975a..0000000000 --- a/src/csharp/GrpcCore/Internal/Event.cs +++ /dev/null @@ -1,224 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#endregion - -using System; -using System.Runtime.InteropServices; -using Google.GRPC.Core; - -namespace Google.GRPC.Core.Internal -{ - /// <summary> - /// grpc_event from grpc/grpc.h - /// </summary> - internal class EventSafeHandle : SafeHandleZeroIsInvalid - { - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_finish(IntPtr ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_event_call(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern StatusCode grpcsharp_event_finished_status(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_finished_details(EventSafeHandle ev); // returns const char* - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_read_length(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandle ev, byte[] buffer, UIntPtr bufferLen); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandle ev); // returns const char* - - public GRPCCompletionType GetCompletionType() - { - return grpcsharp_event_type(this); - } - - public GRPCOpError GetWriteAccepted() - { - return grpcsharp_event_write_accepted(this); - } - - public GRPCOpError GetFinishAccepted() - { - return grpcsharp_event_finish_accepted(this); - } - - public Status GetFinished() - { - // TODO: can the native method return string directly? - string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this)); - return new Status(grpcsharp_event_finished_status(this), details); - } - - public byte[] GetReadData() - { - IntPtr len = grpcsharp_event_read_length(this); - if (len == new IntPtr(-1)) - { - return null; - } - byte[] data = new byte[(int) len]; - grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length)); - return data; - } - - public CallSafeHandle GetCall() { - return grpcsharp_event_call(this); - } - - public string GetServerRpcNewMethod() { - // TODO: can the native method return string directly? - return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this)); - } - - //TODO: client_metadata_read event type - - protected override bool ReleaseHandle() - { - grpcsharp_event_finish(handle); - return true; - } - } - - // TODO: this is basically c&p of EventSafeHandle. Unify! - /// <summary> - /// Not owned version of - /// grpc_event from grpc/grpc.h - /// </summary> - internal class EventSafeHandleNotOwned : SafeHandleZeroIsInvalid - { - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_finish(IntPtr ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_event_call(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern StatusCode grpcsharp_event_finished_status(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_finished_details(EventSafeHandleNotOwned ev); // returns const char* - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_read_length(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandleNotOwned ev, byte[] buffer, UIntPtr bufferLen); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandleNotOwned ev); // returns const char* - - public EventSafeHandleNotOwned() : base(false) - { - } - - public EventSafeHandleNotOwned(IntPtr handle) : base(false) - { - SetHandle(handle); - } - - public GRPCCompletionType GetCompletionType() - { - return grpcsharp_event_type(this); - } - - public GRPCOpError GetWriteAccepted() - { - return grpcsharp_event_write_accepted(this); - } - - public GRPCOpError GetFinishAccepted() - { - return grpcsharp_event_finish_accepted(this); - } - - public Status GetFinished() - { - // TODO: can the native method return string directly? - string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this)); - return new Status(grpcsharp_event_finished_status(this), details); - } - - public byte[] GetReadData() - { - IntPtr len = grpcsharp_event_read_length(this); - if (len == new IntPtr(-1)) - { - return null; - } - byte[] data = new byte[(int) len]; - grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length)); - return data; - } - - public CallSafeHandle GetCall() { - return grpcsharp_event_call(this); - } - - public string GetServerRpcNewMethod() { - // TODO: can the native method return string directly? - return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this)); - } - - //TODO: client_metadata_read event type - - protected override bool ReleaseHandle() - { - grpcsharp_event_finish(handle); - return true; - } - } -} diff --git a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs index f8154fa250..634a0b2d72 100644 --- a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs +++ b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs @@ -48,7 +48,6 @@ namespace Google.GRPC.Core.Internal readonly object myLock = new object(); readonly List<Thread> threads = new List<Thread>(); readonly int poolSize; - readonly Action<EventSafeHandle> eventHandler; CompletionQueueSafeHandle cq; @@ -56,11 +55,6 @@ namespace Google.GRPC.Core.Internal this.poolSize = poolSize; } - internal GrpcThreadPool(int poolSize, Action<EventSafeHandle> eventHandler) { - this.poolSize = poolSize; - this.eventHandler = eventHandler; - } - public void Start() { lock (myLock) @@ -104,34 +98,19 @@ namespace Google.GRPC.Core.Internal } } - private Thread CreateAndStartThread(int i) { - Action body; - if (eventHandler != null) - { - body = ThreadBodyWithHandler; - } - else - { - body = ThreadBodyNoHandler; - } - var thread = new Thread(new ThreadStart(body)); + private Thread CreateAndStartThread(int i) + { + var thread = new Thread(new ThreadStart(RunHandlerLoop)); thread.IsBackground = false; thread.Start(); - if (eventHandler != null) - { - thread.Name = "grpc_server_newrpc " + i; - } - else - { - thread.Name = "grpc " + i; - } + thread.Name = "grpc " + i; return thread; } /// <summary> /// Body of the polling thread. /// </summary> - private void ThreadBodyNoHandler() + private void RunHandlerLoop() { GRPCCompletionType completionType; do @@ -140,22 +119,6 @@ namespace Google.GRPC.Core.Internal } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); } - - /// <summary> - /// Body of the polling thread. - /// </summary> - private void ThreadBodyWithHandler() - { - GRPCCompletionType completionType; - do - { - using (EventSafeHandle ev = cq.Next(Timespec.InfFuture)) { - completionType = ev.GetCompletionType(); - eventHandler(ev); - } - } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); - Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); - } } } diff --git a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs index 74a8ef7b6e..59f08d4ca8 100644 --- a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs +++ b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs @@ -56,6 +56,12 @@ namespace Google.GRPC.Core.Internal return handle == IntPtr.Zero; } } + + protected override bool ReleaseHandle() + { + // handle is not owned. + return true; + } } } diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs index c91de97ce3..047bde1add 100644 --- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs @@ -38,24 +38,22 @@ using System.Collections.Concurrent; namespace Google.GRPC.Core.Internal { + // TODO: we need to make sure that the delegates are not collected before invoked. + internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); + /// <summary> /// grpc_server from grpc/grpc.h /// </summary> internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_request_call_old")] - static extern GRPCCallError grpcsharp_server_request_call_old_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); - [DllImport("grpc_csharp_ext.dll")] - static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - // TODO: check int representation size [DllImport("grpc_csharp_ext.dll")] - static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); + static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); - // TODO: check int representation size [DllImport("grpc_csharp_ext.dll")] - static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr); + static extern Int32 grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_start(ServerSafeHandle server); @@ -63,8 +61,9 @@ namespace Google.GRPC.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_shutdown(ServerSafeHandle server); + // TODO: get rid of the old callback style [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")] - static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_destroy(IntPtr server); @@ -81,7 +80,6 @@ namespace Google.GRPC.Core.Internal public int AddPort(string addr) { - // TODO: also grpc_server_add_secure_http2_port... return grpcsharp_server_add_http2_port(this, addr); } @@ -95,14 +93,14 @@ namespace Google.GRPC.Core.Internal grpcsharp_server_shutdown(this); } - public void ShutdownAndNotify(EventCallbackDelegate callback) + public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) { grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); } - public GRPCCallError RequestCall(EventCallbackDelegate callback) + public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) { - return grpcsharp_server_request_call_old_CALLBACK(this, callback); + return grpcsharp_server_request_call(this, cq, callback); } protected override bool ReleaseHandle() diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs index 1d29864b9f..e9cb65cb3b 100644 --- a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs +++ b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs @@ -40,11 +40,11 @@ namespace Google.GRPC.Core.Internal /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) /// and then halfcloses the call. Used for server-side call handling. /// </summary> - internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite> + internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite> { readonly AsyncCall<TWrite, TRead> call; - public ServerWritingObserver(AsyncCall<TWrite, TRead> call) + public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call) { this.call = call; } @@ -52,19 +52,19 @@ namespace Google.GRPC.Core.Internal public void OnCompleted() { // TODO: how bad is the Wait here? - call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + call.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); } public void OnError(Exception error) { - // TODO: handle this... + // TODO: implement this... throw new InvalidOperationException("This should never be called."); } public void OnNext(TWrite value) { // TODO: how bad is the Wait here? - call.WriteAsync(value).Wait(); + call.SendMessageAsync(value).Wait(); } } } diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs index 0882a61299..91842d8182 100644 --- a/src/csharp/GrpcCore/Server.cs +++ b/src/csharp/GrpcCore/Server.cs @@ -49,8 +49,8 @@ namespace Google.GRPC.Core { // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. - readonly EventCallbackDelegate newRpcHandler; - readonly EventCallbackDelegate serverShutdownHandler; + readonly ServerShutdownCallbackDelegate serverShutdownHandler; + readonly CompletionCallbackDelegate newServerRpcHandler; readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); readonly ServerSafeHandle handle; @@ -61,9 +61,8 @@ namespace Google.GRPC.Core public Server() { - // TODO: what is the tag for server shutdown? this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); - this.newRpcHandler = HandleNewRpc; + this.newServerRpcHandler = HandleNewServerRpc; this.serverShutdownHandler = HandleServerShutdown; } @@ -99,7 +98,7 @@ namespace Google.GRPC.Core { var rpcInfo = newRpcQueue.Take(); - Console.WriteLine("Server received RPC " + rpcInfo.Method); + //Console.WriteLine("Server received RPC " + rpcInfo.Method); IServerCallHandler callHandler; if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) @@ -138,23 +137,25 @@ namespace Google.GRPC.Core private void AllowOneRpc() { - AssertCallOk(handle.RequestCall(newRpcHandler)); + AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); } - private void HandleNewRpc(IntPtr eventPtr) - { - try - { - var ev = new EventSafeHandleNotOwned(eventPtr); - var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod()); + private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) { + // TODO: handle error + } + + var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); // after server shutdown, the callback returns with null call if (!rpcInfo.Call.IsInvalid) { newRpcQueue.Add(rpcInfo); } - } - catch (Exception e) - { + + } catch(Exception e) { Console.WriteLine("Caught exception in a native handler: " + e); } } diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs index bcce4a091f..48d1eaa335 100644 --- a/src/csharp/GrpcCore/ServerCallHandler.cs +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -59,15 +59,16 @@ namespace Google.GRPC.Core method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); - asyncCall.Accept(cq); + + var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); - var request = asyncCall.ReadAsync().Result; + var request = asyncCall.ReceiveMessageAsync().Result; - var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); handler(request, responseObserver); - asyncCall.Halfclosed.Wait(); - asyncCall.Finished.Wait(); + finishedTask.Wait(); + } } @@ -89,16 +90,11 @@ namespace Google.GRPC.Core method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); - asyncCall.Accept(cq); - var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); var requestObserver = handler(responseObserver); - - // feed the requests - asyncCall.StartReadingToStream(requestObserver); - - asyncCall.Halfclosed.Wait(); - asyncCall.Finished.Wait(); + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver); + finishedTask.Wait(); } } @@ -110,12 +106,31 @@ namespace Google.GRPC.Core AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( (payload) => payload, (payload) => payload); + asyncCall.InitializeServer(call); - asyncCall.Accept(cq); - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); - asyncCall.Finished.Wait(); + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>()); + + asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); + + finishedTask.Wait(); + } + } + + internal class NullObserver<T> : IObserver<T> + { + public void OnCompleted() + { + } + + public void OnError(Exception error) + { } + + public void OnNext(T value) + { + } + } } diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 4401156520..ba43e4f6a0 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -36,6 +36,7 @@ using NUnit.Framework; using Google.GRPC.Core; using Google.GRPC.Core.Internal; using System.Threading; +using System.Diagnostics; using System.Threading.Tasks; using Google.GRPC.Core.Utils; @@ -51,11 +52,21 @@ namespace Google.GRPC.Core.Tests Marshallers.StringMarshaller, Marshallers.StringMarshaller); - [Test] - public void EmptyCall() + [TestFixtureSetUp] + public void Init() { GrpcEnvironment.Initialize(); + } + + [TestFixtureTearDown] + public void Cleanup() + { + GrpcEnvironment.Shutdown(); + } + [Test] + public void UnaryCall() + { Server server = new Server(); server.AddServiceDefinition( ServerServiceDefinition.CreateBuilder("someService") @@ -69,19 +80,71 @@ namespace Google.GRPC.Core.Tests var call = new Call<string, string>(unaryEchoStringMethod, channel); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); + Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken))); } server.ShutdownAsync().Wait(); + } - GrpcEnvironment.Shutdown(); + [Test] + public void UnaryCallPerformance() + { + Server server = new Server(); + server.AddServiceDefinition( + ServerServiceDefinition.CreateBuilder("someService") + .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); + + int port = server.AddPort(host + ":0"); + server.Start(); + + using (Channel channel = new Channel(host + ":" + port)) + { + var call = new Call<string, string>(unaryEchoStringMethod, channel); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + for (int i = 0; i < 1000; i++) + { + Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + } + + server.ShutdownAsync().Wait(); } - private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) { + [Test] + public void UnknownMethodHandler() + { + Server server = new Server(); + server.AddServiceDefinition( + ServerServiceDefinition.CreateBuilder("someService").Build()); + + int port = server.AddPort(host + ":0"); + server.Start(); + + using (Channel channel = new Channel(host + ":" + port)) + { + var call = new Call<string, string>(unaryEchoStringMethod, channel); + + try { + Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); + Assert.Fail(); + } catch(RpcException e) { + Assert.AreEqual(StatusCode.GRPC_STATUS_UNIMPLEMENTED, e.Status.StatusCode); + } + } + + server.ShutdownAsync().Wait(); + } + + private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) + { responseObserver.OnNext(request); responseObserver.OnCompleted(); } - } } diff --git a/src/csharp/README.md b/src/csharp/README.md index a16f1e719e..f56ddabda5 100755 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -25,10 +25,11 @@ INSTALLATION AND USAGE: WINDOWS INSTALLATION AND USAGE: LINUX & MONO ------------------------------------ -- Compile and install the gRPC C Core library +- Compile and install the gRPC C# extension library (that will be used via + P/Invoke from C#). ``` -make shared_c -sudo make install +make grpc_csharp_ext +sudo make install_grpc_csharp_ext ``` - Prerequisites for development: Mono framework, MonoDevelop (IDE) diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index c7949af44e..304ee9cf34 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -32,9 +32,11 @@ */ #include <grpc/support/port_platform.h> +#include <grpc/support/alloc.h> #include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/string.h> #include <string.h> @@ -58,6 +60,139 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) { return bb; } +typedef void(GPR_CALLTYPE *callback_funcptr)(grpc_op_error op_error, + void *batch_context); + +/* + * Helper to maintain lifetime of batch op inputs and store batch op outputs. + */ +typedef struct gprcsharp_batch_context { + grpc_metadata_array send_initial_metadata; + grpc_byte_buffer *send_message; + struct { + grpc_metadata_array trailing_metadata; + char *status_details; + } send_status_from_server; + grpc_metadata_array recv_initial_metadata; + grpc_byte_buffer *recv_message; + struct { + grpc_metadata_array trailing_metadata; + grpc_status_code status; + char *status_details; + size_t status_details_capacity; + } recv_status_on_client; + int recv_close_on_server_cancelled; + struct { + grpc_call *call; + grpc_call_details call_details; + grpc_metadata_array request_metadata; + } server_rpc_new; + + /* callback will be called upon completion */ + callback_funcptr callback; + +} grpcsharp_batch_context; + +grpcsharp_batch_context *grpcsharp_batch_context_create() { + grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context)); + memset(ctx, 0, sizeof(grpcsharp_batch_context)); + return ctx; +} + +/** + * Destroys metadata array including keys and values. + */ +void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) { + if (!array->metadata) { + return; + } + /* TODO: destroy also keys and values */ + grpc_metadata_array_destroy(array); +} + +void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { + if (!ctx) { + return; + } + grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata)); + + grpc_byte_buffer_destroy(ctx->send_message); + + grpcsharp_metadata_array_destroy_recursive( + &(ctx->send_status_from_server.trailing_metadata)); + gpr_free(ctx->send_status_from_server.status_details); + + grpc_metadata_array_destroy(&(ctx->recv_initial_metadata)); + + grpc_byte_buffer_destroy(ctx->recv_message); + + grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata)); + gpr_free((void *)ctx->recv_status_on_client.status_details); + + /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is + supposed + to take its ownership. */ + + grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); + grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata)); + + gpr_free(ctx); +} + +GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length( + const grpcsharp_batch_context *ctx) { + if (!ctx->recv_message) { + return -1; + } + return grpc_byte_buffer_length(ctx->recv_message); +} + +/* + * Copies data from recv_message to a buffer. Fatal error occurs if + * buffer is too small. + */ +GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer( + const grpcsharp_batch_context *ctx, char *buffer, size_t buffer_len) { + grpc_byte_buffer_reader *reader; + gpr_slice slice; + size_t offset = 0; + + reader = grpc_byte_buffer_reader_create(ctx->recv_message); + + while (grpc_byte_buffer_reader_next(reader, &slice)) { + size_t len = GPR_SLICE_LENGTH(slice); + GPR_ASSERT(offset + len <= buffer_len); + memcpy(buffer + offset, GPR_SLICE_START_PTR(slice), + GPR_SLICE_LENGTH(slice)); + offset += len; + gpr_slice_unref(slice); + } + grpc_byte_buffer_reader_destroy(reader); +} + +GPR_EXPORT grpc_status_code GPR_CALLTYPE +grpcsharp_batch_context_recv_status_on_client_status( + const grpcsharp_batch_context *ctx) { + return ctx->recv_status_on_client.status; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_batch_context_recv_status_on_client_details( + const grpcsharp_batch_context *ctx) { + return ctx->recv_status_on_client.status_details; +} + +GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call( + const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_method( + const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call_details.method; +} + /* Init & shutdown */ GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); } @@ -71,18 +206,6 @@ grpcsharp_completion_queue_create(void) { return grpc_completion_queue_create(); } -GPR_EXPORT grpc_event *GPR_CALLTYPE -grpcsharp_completion_queue_next(grpc_completion_queue *cq, - gpr_timespec deadline) { - return grpc_completion_queue_next(cq, deadline); -} - -GPR_EXPORT grpc_event *GPR_CALLTYPE -grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag, - gpr_timespec deadline) { - return grpc_completion_queue_pluck(cq, tag, deadline); -} - GPR_EXPORT void GPR_CALLTYPE grpcsharp_completion_queue_shutdown(grpc_completion_queue *cq) { grpc_completion_queue_shutdown(cq); @@ -96,12 +219,18 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) { GPR_EXPORT grpc_completion_type GPR_CALLTYPE grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) { grpc_event *ev; + grpcsharp_batch_context *batch_context; grpc_completion_type t; void(GPR_CALLTYPE * callback)(grpc_event *); ev = grpc_completion_queue_next(cq, gpr_inf_future); t = ev->type; - if (ev->tag) { + if (t == GRPC_OP_COMPLETE && ev->tag) { + /* NEW API handler */ + batch_context = (grpcsharp_batch_context *)ev->tag; + batch_context->callback(ev->data.op_complete, batch_context); + grpcsharp_batch_context_destroy(batch_context); + } else if (ev->tag) { /* call the callback in ev->tag */ /* C forbids to cast object pointers to function pointers, so * we cast to intptr first. @@ -129,204 +258,286 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) { } GPR_EXPORT grpc_call *GPR_CALLTYPE -grpcsharp_channel_create_call_old(grpc_channel *channel, const char *method, - const char *host, gpr_timespec deadline) { - return grpc_channel_create_call_old(channel, method, host, deadline); +grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec deadline) { + return grpc_channel_create_call(channel, cq, method, host, deadline); } -/* Event */ +/* Timespec */ -GPR_EXPORT void GPR_CALLTYPE grpcsharp_event_finish(grpc_event *event) { - grpc_event_finish(event); -} +GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); } -GPR_EXPORT grpc_completion_type GPR_CALLTYPE -grpcsharp_event_type(const grpc_event *event) { - return event->type; +GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) { + return gpr_inf_future; } -GPR_EXPORT grpc_op_error GPR_CALLTYPE -grpcsharp_event_write_accepted(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED); - return event->data.invoke_accepted; +GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) { + return sizeof(gpr_timespec); } -GPR_EXPORT grpc_op_error GPR_CALLTYPE -grpcsharp_event_finish_accepted(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_FINISH_ACCEPTED); - return event->data.finish_accepted; -} +/* Call */ -GPR_EXPORT grpc_status_code GPR_CALLTYPE -grpcsharp_event_finished_status(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_FINISHED); - return event->data.finished.status; +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) { + return grpc_call_cancel(call); } -GPR_EXPORT const char *GPR_CALLTYPE -grpcsharp_event_finished_details(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_FINISHED); - return event->data.finished.details; +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status, + const char *description) { + return grpc_call_cancel_with_status(call, status, description); } -GPR_EXPORT gpr_intptr GPR_CALLTYPE -grpcsharp_event_read_length(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_READ); - if (!event->data.read) { - return -1; - } - return grpc_byte_buffer_length(event->data.read); +GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) { + grpc_call_destroy(call); } -/* - * Copies data from read event to a buffer. Fatal error occurs if - * buffer is too small. - */ GPR_EXPORT void GPR_CALLTYPE -grpcsharp_event_read_copy_to_buffer(const grpc_event *event, char *buffer, - size_t buffer_len) { - grpc_byte_buffer_reader *reader; - gpr_slice slice; - size_t offset = 0; +grpcsharp_call_start_write_from_copied_buffer(grpc_call *call, + const char *buffer, size_t len, + void *tag, gpr_uint32 flags) { + grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len); + GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) == + GRPC_CALL_OK); + grpc_byte_buffer_destroy(byte_buffer); +} - GPR_ASSERT(event->type == GRPC_READ); - reader = grpc_byte_buffer_reader_create(event->data.read); +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[6]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; - GPR_ASSERT(event->data.read); - while (grpc_byte_buffer_reader_next(reader, &slice)) { - size_t len = GPR_SLICE_LENGTH(slice); - GPR_ASSERT(offset + len <= buffer_len); - memcpy(buffer + offset, GPR_SLICE_START_PTR(slice), - GPR_SLICE_LENGTH(slice)); - offset += len; - gpr_slice_unref(slice); - } - grpc_byte_buffer_reader_destroy(reader); -} + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; -GPR_EXPORT grpc_call *GPR_CALLTYPE -grpcsharp_event_call(const grpc_event *event) { - /* we only allow this for newly incoming server calls. */ - GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW); - return event->call; -} + ops[1].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[1].data.send_message = ctx->send_message; -GPR_EXPORT const char *GPR_CALLTYPE -grpcsharp_event_server_rpc_new_method(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW); - return event->data.server_rpc_new.method; -} + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; -/* Timespec */ + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); -GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); } + ops[4].op = GRPC_OP_RECV_MESSAGE; + ops[4].data.recv_message = &(ctx->recv_message); -GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) { - return gpr_inf_future; -} + ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[5].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[5].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[5].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[5].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); -GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) { - return sizeof(gpr_timespec); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -/* Call */ - GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_add_metadata_old(grpc_call *call, grpc_metadata *metadata, - gpr_uint32 flags) { - return grpc_call_add_metadata_old(call, metadata, flags); +grpcsharp_call_start_client_streaming(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[4]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[2].op = GRPC_OP_RECV_MESSAGE; + ops[2].data.recv_message = &(ctx->recv_message); + + ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[3].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[3].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[3].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[3].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, - void *metadata_read_tag, void *finished_tag, - gpr_uint32 flags) { - return grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, flags); +grpcsharp_call_start_server_streaming(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, + size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[5]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[1].data.send_message = ctx->send_message; + + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[4].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[4].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[4].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[4].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_server_accept_old(grpc_call *call, grpc_completion_queue *cq, - void *finished_tag) { - return grpc_call_server_accept_old(call, cq, finished_tag); +grpcsharp_call_start_duplex_streaming(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[3]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[2].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[2].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[2].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[2].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_server_end_initial_metadata_old(grpc_call *call, - gpr_uint32 flags) { - return grpc_call_server_end_initial_metadata_old(call, flags); -} +grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) { - return grpc_call_cancel(call); -} + ops[0].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[0].data.send_message = ctx->send_message; -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status, - const char *description) { - return grpc_call_cancel_with_status(call, status, description); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_write_old(grpc_call *call, grpc_byte_buffer *byte_buffer, - void *tag, gpr_uint32 flags) { - return grpc_call_start_write_old(call, byte_buffer, tag, flags); +grpcsharp_call_send_close_from_client(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_write_status_old(grpc_call *call, - grpc_status_code status_code, - const char *status_message, void *tag) { - return grpc_call_start_write_status_old(call, status_code, status_message, - tag); +grpcsharp_call_send_status_from_server(grpc_call *call, + callback_funcptr callback, + grpc_status_code status_code, + const char *status_details) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; + ops[0].data.send_status_from_server.status = status_code; + ops[0].data.send_status_from_server.status_details = + gpr_strdup(status_details); + ops[0].data.send_status_from_server.trailing_metadata = NULL; + ops[0].data.send_status_from_server.trailing_metadata_count = 0; + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_writes_done_old(grpc_call *call, void *tag) { - return grpc_call_writes_done_old(call, tag); +grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_RECV_MESSAGE; + ops[0].data.recv_message = &(ctx->recv_message); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_read_old(grpc_call *call, void *tag) { - return grpc_call_start_read_old(call, tag); -} +grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[2]; -GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) { - grpc_call_destroy(call); -} + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; -GPR_EXPORT void GPR_CALLTYPE -grpcsharp_call_start_write_from_copied_buffer(grpc_call *call, - const char *buffer, size_t len, - void *tag, gpr_uint32 flags) { - grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len); - GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) == - GRPC_CALL_OK); - grpc_byte_buffer_destroy(byte_buffer); -} + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; -/* Server */ + ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops[1].data.recv_close_on_server.cancelled = + (&ctx->recv_close_on_server_cancelled); -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_server_request_call_old(grpc_server *server, void *tag_new) { - return grpc_server_request_call_old(server, tag_new); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } +/* Server */ + GPR_EXPORT grpc_server *GPR_CALLTYPE grpcsharp_server_create(grpc_completion_queue *cq, const grpc_channel_args *args) { return grpc_server_create(cq, args); } -GPR_EXPORT int GPR_CALLTYPE +GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_server_add_http2_port(grpc_server *server, const char *addr) { return grpc_server_add_http2_port(server, addr); } -GPR_EXPORT int GPR_CALLTYPE -grpcsharp_server_add_secure_http2_port(grpc_server *server, const char *addr) { - return grpc_server_add_secure_http2_port(server, addr); -} - GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) { grpc_server_start(server); } @@ -343,3 +554,14 @@ grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) { GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { grpc_server_destroy(server); } + +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, + callback_funcptr callback) { + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + return grpc_server_request_call( + server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), + &(ctx->server_rpc_new.request_metadata), cq, ctx); +} |