aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc22
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c2
-rw-r--r--src/core/security/credentials.c6
-rw-r--r--src/core/surface/call.c1
-rw-r--r--src/core/surface/channel.c2
-rw-r--r--src/core/transport/chttp2_transport.c14
-rw-r--r--src/core/transport/metadata.c19
-rw-r--r--src/core/transport/metadata.h3
-rw-r--r--src/cpp/client/client_unary_call.cc26
-rw-r--r--src/csharp/GrpcApi/MathGrpc.cs6
-rw-r--r--src/csharp/GrpcApi/TestServiceGrpc.cs6
-rw-r--r--src/csharp/GrpcApiTests/MathClientServerTests.cs18
-rw-r--r--src/csharp/GrpcCore/Calls.cs58
-rw-r--r--src/csharp/GrpcCore/GrpcCore.csproj6
-rw-r--r--src/csharp/GrpcCore/GrpcEnvironment.cs2
-rw-r--r--src/csharp/GrpcCore/Internal/AsyncCall.cs588
-rw-r--r--src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs96
-rw-r--r--src/csharp/GrpcCore/Internal/CallSafeHandle.cs138
-rw-r--r--src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs (renamed from src/csharp/GrpcCore/Internal/StreamingInputObserver.cs)15
-rw-r--r--src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs16
-rw-r--r--src/csharp/GrpcCore/Internal/Event.cs224
-rw-r--r--src/csharp/GrpcCore/Internal/GrpcThreadPool.cs47
-rw-r--r--src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs6
-rw-r--r--src/csharp/GrpcCore/Internal/ServerSafeHandle.cs24
-rw-r--r--src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs (renamed from src/csharp/GrpcCore/Internal/ServerWritingObserver.cs)10
-rw-r--r--src/csharp/GrpcCore/Server.cs31
-rw-r--r--src/csharp/GrpcCore/ServerCallHandler.cs47
-rw-r--r--src/csharp/GrpcCoreTests/ClientServerTest.cs73
-rwxr-xr-xsrc/csharp/README.md7
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c512
30 files changed, 1093 insertions, 932 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index aa764cbb33..b73b000a1c 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -127,6 +127,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
"class ServerContext;\n";
if (HasUnaryCalls(file)) {
temp.append(
+ "template <class OutMessage> class ClientAsyncResponseReader;\n");
+ temp.append(
"template <class OutMessage> class ServerAsyncResponseWriter;\n");
}
if (HasClientOnlyStreaming(file)) {
@@ -160,7 +162,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
}
std::string GetSourceIncludes() {
- return "#include <grpc++/channel_interface.h>\n"
+ return "#include <grpc++/async_unary_call.h>\n"
+ "#include <grpc++/channel_interface.h>\n"
"#include <grpc++/impl/client_unary_call.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/rpc_service_method.h>\n"
@@ -181,9 +184,9 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
"::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response);\n");
printer->Print(*vars,
- "void $Method$(::grpc::ClientContext* context, "
- "const $Request$& request, $Response$* response, "
- "::grpc::Status* status, "
+ "::grpc::ClientAsyncResponseReader< $Response$>* "
+ "$Method$(::grpc::ClientContext* context, "
+ "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
@@ -378,14 +381,15 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"context, request, response);\n"
"}\n\n");
printer->Print(*vars,
- "void $Service$::Stub::$Method$("
- "::grpc::ClientContext* context, "
- "const $Request$& request, $Response$* response, ::grpc::Status* status, "
+ "::grpc::ClientAsyncResponseReader< $Response$>* "
+ "$Service$::Stub::$Method$(::grpc::ClientContext* context, "
+ "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
- " ::grpc::AsyncUnaryCall(channel(),"
+ " return new ClientAsyncResponseReader< $Response$>("
+ "channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$]), "
- "context, request, response, status, cq, tag);\n"
+ "context, request, tag);\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 9fb2819506..a1c3938a33 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -93,7 +93,7 @@ static int multipoll_with_epoll_pollset_maybe_work(
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
- * if (pollset->counter == 0) { return 0 }
+ * if (pollset->counter != 0) { return 0; }
* here.
*/
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index b2e0fd215a..60e82d9dfa 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -313,7 +313,7 @@ static void oauth2_token_fetcher_destroy(grpc_credentials *creds) {
grpc_mdelem_unref(c->access_token_md);
}
gpr_mu_destroy(&c->mu);
- grpc_mdctx_orphan(c->md_ctx);
+ grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}
@@ -587,7 +587,7 @@ static void fake_oauth2_destroy(grpc_credentials *creds) {
if (c->access_token_md != NULL) {
grpc_mdelem_unref(c->access_token_md);
}
- grpc_mdctx_orphan(c->md_ctx);
+ grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}
@@ -897,7 +897,7 @@ static void iam_destroy(grpc_credentials *creds) {
grpc_iam_credentials *c = (grpc_iam_credentials *)creds;
grpc_mdelem_unref(c->token_md);
grpc_mdelem_unref(c->authority_selector_md);
- grpc_mdctx_orphan(c->md_ctx);
+ grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 89a6ba63b2..40caa93868 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -313,7 +313,6 @@ static void set_status_code(grpc_call *call, status_source source,
}
if (flush && !grpc_bbq_empty(&call->incoming_queue)) {
- gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status);
grpc_bbq_flush(&call->incoming_queue);
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index e308c60410..e38734c6a4 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -146,7 +146,7 @@ static void destroy_channel(void *p, int ok) {
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string);
- grpc_mdctx_orphan(channel->metadata_context);
+ grpc_mdctx_unref(channel->metadata_context);
gpr_free(channel);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 551ae27e61..5b2d0a5e5b 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -336,11 +336,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
-static void unref_transport(transport *t) {
+static void destruct_transport(transport *t) {
size_t i;
- if (!gpr_unref(&t->refs)) return;
-
gpr_mu_lock(&t->mu);
GPR_ASSERT(t->ep == NULL);
@@ -380,9 +378,16 @@ static void unref_transport(transport *t) {
grpc_sopb_destroy(&t->nuke_later_sopb);
+ grpc_mdctx_unref(t->metadata_context);
+
gpr_free(t);
}
+static void unref_transport(transport *t) {
+ if (!gpr_unref(&t->refs)) return;
+ destruct_transport(t);
+}
+
static void ref_transport(transport *t) { gpr_ref(&t->refs); }
static void init_transport(transport *t, grpc_transport_setup_callback setup,
@@ -401,6 +406,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_ref_init(&t->refs, 2);
gpr_mu_init(&t->mu);
gpr_cv_init(&t->cv);
+ grpc_mdctx_ref(mdctx);
t->metadata_context = mdctx;
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
@@ -1025,8 +1031,6 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
- gpr_log(GPR_DEBUG, "cancel %d", id);
-
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s->outgoing_sopb.nops != 0;
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 3dc23e7de2..1c15716fad 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -79,7 +79,7 @@ typedef struct internal_metadata {
struct grpc_mdctx {
gpr_uint32 hash_seed;
- int orphaned;
+ int refs;
gpr_mu mu;
@@ -114,7 +114,7 @@ static void unlock(grpc_mdctx *ctx) {
mdelems on every unlock (instead of the usual 'I'm too loaded' trigger
case), since otherwise we can be stuck waiting for a garbage collection
that will never happen. */
- if (ctx->orphaned) {
+ if (ctx->refs == 0) {
/* uncomment if you're having trouble diagnosing an mdelem leak to make
things clearer (slows down destruction a lot, however) */
/* gc_mdtab(ctx); */
@@ -139,7 +139,7 @@ static void ref_md(internal_metadata *md) {
grpc_mdctx *grpc_mdctx_create_with_seed(gpr_uint32 seed) {
grpc_mdctx *ctx = gpr_malloc(sizeof(grpc_mdctx));
- ctx->orphaned = 0;
+ ctx->refs = 1;
ctx->hash_seed = seed;
gpr_mu_init(&ctx->mu);
ctx->strtab = gpr_malloc(sizeof(internal_string *) * INITIAL_STRTAB_CAPACITY);
@@ -197,10 +197,17 @@ static void metadata_context_destroy(grpc_mdctx *ctx) {
gpr_free(ctx);
}
-void grpc_mdctx_orphan(grpc_mdctx *ctx) {
+void grpc_mdctx_ref(grpc_mdctx *ctx) {
lock(ctx);
- GPR_ASSERT(!ctx->orphaned);
- ctx->orphaned = 1;
+ GPR_ASSERT(ctx->refs > 0);
+ ctx->refs++;
+ unlock(ctx);
+}
+
+void grpc_mdctx_unref(grpc_mdctx *ctx) {
+ lock(ctx);
+ GPR_ASSERT(ctx->refs > 0);
+ ctx->refs--;
unlock(ctx);
}
diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h
index 430cae6847..7a56e34690 100644
--- a/src/core/transport/metadata.h
+++ b/src/core/transport/metadata.h
@@ -84,7 +84,8 @@ struct grpc_mdelem {
/* Create/orphan a metadata context */
grpc_mdctx *grpc_mdctx_create(void);
grpc_mdctx *grpc_mdctx_create_with_seed(gpr_uint32 seed);
-void grpc_mdctx_orphan(grpc_mdctx *mdctx);
+void grpc_mdctx_ref(grpc_mdctx *mdctx);
+void grpc_mdctx_unref(grpc_mdctx *mdctx);
/* Test only accessors to internal state - only for testing this code - do not
rely on it outside of metadata_test.c */
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
index a5ef989946..684b3cbadb 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/client/client_unary_call.cc
@@ -61,30 +61,4 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
return status;
}
-class ClientAsyncRequest final : public CallOpBuffer {
- public:
- bool FinalizeResult(void **tag, bool *status) override {
- bool r = CallOpBuffer::FinalizeResult(tag, status);
- delete this;
- return r;
- }
-};
-
-void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
- ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result, Status *status,
- CompletionQueue *cq, void *tag) {
- ClientAsyncRequest *buf = new ClientAsyncRequest;
- buf->Reset(tag);
- Call call(channel->CreateCall(method, context, cq));
- buf->AddSendInitialMetadata(context);
- buf->AddSendMessage(request);
- buf->AddRecvInitialMetadata(context);
- buf->AddRecvMessage(result);
- buf->AddClientSendClose();
- buf->AddClientRecvStatus(context, status);
- call.PerformOps(buf);
-}
-
} // namespace grpc
diff --git a/src/csharp/GrpcApi/MathGrpc.cs b/src/csharp/GrpcApi/MathGrpc.cs
index caea1608ec..44e704e496 100644
--- a/src/csharp/GrpcApi/MathGrpc.cs
+++ b/src/csharp/GrpcApi/MathGrpc.cs
@@ -81,7 +81,7 @@ namespace math
Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
- Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
+ void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken));
@@ -109,10 +109,10 @@ namespace math
return Calls.AsyncUnaryCall(call, request, token);
}
- public Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
+ public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<FibArgs, Num>(fibMethod, channel);
- return Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
+ Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
diff --git a/src/csharp/GrpcApi/TestServiceGrpc.cs b/src/csharp/GrpcApi/TestServiceGrpc.cs
index 6534a44ef4..64d5c09563 100644
--- a/src/csharp/GrpcApi/TestServiceGrpc.cs
+++ b/src/csharp/GrpcApi/TestServiceGrpc.cs
@@ -99,7 +99,7 @@ namespace grpc.testing
Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken));
- Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
+ void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
@@ -141,9 +141,9 @@ namespace grpc.testing
return Calls.AsyncUnaryCall(call, request, token);
}
- public Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) {
+ public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) {
var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(streamingOutputCallMethod, channel);
- return Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
+ Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
diff --git a/src/csharp/GrpcApiTests/MathClientServerTests.cs b/src/csharp/GrpcApiTests/MathClientServerTests.cs
index bd298b0932..9056142097 100644
--- a/src/csharp/GrpcApiTests/MathClientServerTests.cs
+++ b/src/csharp/GrpcApiTests/MathClientServerTests.cs
@@ -64,6 +64,15 @@ namespace math.Tests
client = MathGrpc.NewStub(channel);
}
+ [TestFixtureTearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+
+ server.ShutdownAsync().Wait();
+ GrpcEnvironment.Shutdown();
+ }
+
[Test]
public void Div1()
{
@@ -136,15 +145,6 @@ namespace math.Tests
CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder));
}
-
- [TestFixtureTearDown]
- public void Cleanup()
- {
- channel.Dispose();
-
- server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
- }
}
}
diff --git a/src/csharp/GrpcCore/Calls.cs b/src/csharp/GrpcCore/Calls.cs
index d89d9a16f9..e5ddd879d6 100644
--- a/src/csharp/GrpcCore/Calls.cs
+++ b/src/csharp/GrpcCore/Calls.cs
@@ -47,50 +47,42 @@ namespace Google.GRPC.Core
{
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
- //TODO: implement this in real synchronous style once new GRPC C core API is available.
- return AsyncUnaryCall(call, req, token).Result;
+ //TODO: implement this in real synchronous style.
+ try {
+ return AsyncUnaryCall(call, req, token).Result;
+ } catch(AggregateException ae) {
+ foreach (var e in ae.InnerExceptions)
+ {
+ if (e is RpcException)
+ {
+ throw e;
+ }
+ }
+ throw;
+ }
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
- asyncCall.Initialize(call.Channel, call.MethodName);
- asyncCall.Start(false, GetCompletionQueue());
-
- await asyncCall.WriteAsync(req);
- await asyncCall.WritesCompletedAsync();
-
- TResponse response = await asyncCall.ReadAsync();
-
- Status status = await asyncCall.Finished;
-
- if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
- {
- throw new RpcException(status);
- }
- return response;
+ asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
+ return await asyncCall.UnaryCallAsync(req);
}
- public static async Task AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
+ public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
- asyncCall.Initialize(call.Channel, call.MethodName);
- asyncCall.Start(false, GetCompletionQueue());
- asyncCall.StartReadingToStream(outputs);
-
- await asyncCall.WriteAsync(req);
- await asyncCall.WritesCompletedAsync();
+ asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
+ asyncCall.StartServerStreamingCall(req, outputs);
}
public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
- asyncCall.Initialize(call.Channel, call.MethodName);
- asyncCall.Start(false, GetCompletionQueue());
-
- var task = asyncCall.ReadAsync();
- var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
+ asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
+ var task = asyncCall.ClientStreamingCallAsync();
+ var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs);
}
@@ -102,12 +94,10 @@ namespace Google.GRPC.Core
public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
- asyncCall.Initialize(call.Channel, call.MethodName);
- asyncCall.Start(false, GetCompletionQueue());
+ asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
- asyncCall.StartReadingToStream(outputs);
- var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
- return inputs;
+ asyncCall.StartDuplexStreamingCall(outputs);
+ return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
}
private static CompletionQueueSafeHandle GetCompletionQueue() {
diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj
index 34b9f6dfb8..ee76b742ce 100644
--- a/src/csharp/GrpcCore/GrpcCore.csproj
+++ b/src/csharp/GrpcCore/GrpcCore.csproj
@@ -47,21 +47,21 @@
<Compile Include="Internal\ChannelSafeHandle.cs" />
<Compile Include="Internal\CompletionQueueSafeHandle.cs" />
<Compile Include="Internal\Enums.cs" />
- <Compile Include="Internal\Event.cs" />
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
- <Compile Include="Internal\StreamingInputObserver.cs" />
<Compile Include="Method.cs" />
<Compile Include="ServerCalls.cs" />
<Compile Include="ServerCallHandler.cs" />
- <Compile Include="Internal\ServerWritingObserver.cs" />
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\RecordingObserver.cs" />
<Compile Include="Utils\RecordingQueue.cs" />
+ <Compile Include="Internal\ClientStreamingInputObserver.cs" />
+ <Compile Include="Internal\ServerStreamingOutputObserver.cs" />
+ <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/GrpcCore/GrpcEnvironment.cs b/src/csharp/GrpcCore/GrpcEnvironment.cs
index c4f030267d..55a6cac8f6 100644
--- a/src/csharp/GrpcCore/GrpcEnvironment.cs
+++ b/src/csharp/GrpcCore/GrpcEnvironment.cs
@@ -42,7 +42,7 @@ namespace Google.GRPC.Core
/// </summary>
public class GrpcEnvironment
{
- const int THREAD_POOL_SIZE = 1;
+ const int THREAD_POOL_SIZE = 4;
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_init();
diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs
index d5f3239e1e..ce0ba30d53 100644
--- a/src/csharp/GrpcCore/Internal/AsyncCall.cs
+++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs
@@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
-//
+//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
-//
+//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
-//
+//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@@ -42,171 +42,177 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
- /// Listener for call events that can be delivered from a completion queue.
+ /// Handles native call lifecycle and provides convenience methods.
/// </summary>
- internal interface ICallEventListener {
-
- void OnClientMetadata();
-
- void OnRead(byte[] payload);
-
- void OnWriteAccepted(GRPCOpError error);
-
- void OnFinishAccepted(GRPCOpError error);
-
- // ignore the status on server
- void OnFinished(Status status);
- }
-
- /// <summary>
- /// Handle native call lifecycle and provides convenience methods.
- /// </summary>
- internal class AsyncCall<TWrite, TRead>: ICallEventListener, IDisposable
+ internal class AsyncCall<TWrite, TRead>
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- // TODO: make sure the delegate doesn't get garbage collected while
- // native callbacks are in the completion queue.
- readonly EventCallbackDelegate callbackHandler;
+ readonly CompletionCallbackDelegate unaryResponseHandler;
+ readonly CompletionCallbackDelegate finishedHandler;
+ readonly CompletionCallbackDelegate writeFinishedHandler;
+ readonly CompletionCallbackDelegate readFinishedHandler;
+ readonly CompletionCallbackDelegate halfclosedHandler;
+ readonly CompletionCallbackDelegate finishedServersideHandler;
object myLock = new object();
- bool disposed;
+ GCHandle gchandle;
CallSafeHandle call;
+ bool disposed;
+
+ bool server;
bool started;
bool errorOccured;
-
bool cancelRequested;
+ bool readingDone;
bool halfcloseRequested;
bool halfclosed;
- bool doneWithReading;
- Nullable<Status> finishedStatus;
+ bool finished;
+ // Completion of a pending write if not null.
TaskCompletionSource<object> writeTcs;
+
+ // Completion of a pending read if not null.
TaskCompletionSource<TRead> readTcs;
- TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>();
- TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>();
+ // Completion of a pending halfclose if not null.
+ TaskCompletionSource<object> halfcloseTcs;
+
+ // Completion of a pending unary response if not null.
+ TaskCompletionSource<TRead> unaryResponseTcs;
+
+ // Set after status is received on client. Only used for server streaming and duplex streaming calls.
+ Nullable<Status> finishedStatus;
+ TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+
+ // For streaming, the reads will be delivered to this observer.
IObserver<TRead> readObserver;
public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
- this.callbackHandler = HandleEvent;
+ this.unaryResponseHandler = HandleUnaryResponse;
+ this.finishedHandler = HandleFinished;
+ this.writeFinishedHandler = HandleWriteFinished;
+ this.readFinishedHandler = HandleReadFinished;
+ this.halfclosedHandler = HandleHalfclosed;
+ this.finishedServersideHandler = HandleFinishedServerside;
}
- public Task WriteAsync(TWrite msg)
+ public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
- return StartWrite(msg, false).Task;
+ InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
}
- public Task WritesCompletedAsync()
+ public void InitializeServer(CallSafeHandle call)
{
- WritesDone();
- return halfcloseTcs.Task;
+ InitializeInternal(call, true);
}
- public Task WriteStatusAsync(Status status)
+ public Task<TRead> UnaryCallAsync(TWrite msg)
{
- WriteStatus(status);
- return halfcloseTcs.Task;
- }
+ lock (myLock)
+ {
+ started = true;
+ halfcloseRequested = true;
+ readingDone = true;
- public Task<TRead> ReadAsync()
- {
- return StartRead().Task;
- }
+ // TODO: handle serialization error...
+ byte[] payload = serializer(msg);
- public Task Halfclosed
- {
- get
- {
- return halfcloseTcs.Task;
+ unaryResponseTcs = new TaskCompletionSource<TRead>();
+ call.StartUnary(payload, unaryResponseHandler);
+
+ return unaryResponseTcs.Task;
}
}
- public Task<Status> Finished
+ public Task<TRead> ClientStreamingCallAsync()
{
- get
+ lock (myLock)
{
- return finishedTcs.Task;
+ started = true;
+ readingDone = true;
+
+ unaryResponseTcs = new TaskCompletionSource<TRead>();
+ call.StartClientStreaming(unaryResponseHandler);
+
+ return unaryResponseTcs.Task;
}
}
- /// <summary>
- /// Initiates reading to given observer.
- /// </summary>
- public void StartReadingToStream(IObserver<TRead> readObserver) {
+ public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
+ {
lock (myLock)
{
- CheckStarted();
- if (this.readObserver != null)
- {
- throw new InvalidOperationException("Already registered an observer.");
- }
+ started = true;
+ halfcloseRequested = true;
+
this.readObserver = readObserver;
- StartRead();
- }
- }
- public void Initialize(Channel channel, String methodName) {
- lock (myLock)
- {
- this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture);
+ // TODO: handle serialization error...
+ byte[] payload = serializer(msg);
+
+ call.StartServerStreaming(payload, finishedHandler);
+
+ ReceiveMessageAsync();
}
}
- public void InitializeServer(CallSafeHandle call)
+ public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
{
- lock(myLock)
+ lock (myLock)
{
- this.call = call;
+ started = true;
+
+ this.readObserver = readObserver;
+
+ call.StartDuplexStreaming(finishedHandler);
+
+ ReceiveMessageAsync();
}
}
- // Client only
- public void Start(bool buffered, CompletionQueueSafeHandle cq)
+ public Task ServerSideUnaryRequestCallAsync()
{
lock (myLock)
{
- if (started)
- {
- throw new InvalidOperationException("Already started.");
- }
-
- call.Invoke(cq, buffered, callbackHandler, callbackHandler);
started = true;
+ call.StartServerSide(finishedServersideHandler);
+ return finishedServersideTcs.Task;
}
}
- // Server only
- public void Accept(CompletionQueueSafeHandle cq)
+ public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
{
lock (myLock)
{
- if (started)
+ started = true;
+ call.StartServerSide(finishedServersideHandler);
+
+ if (this.readObserver != null)
{
- throw new InvalidOperationException("Already started.");
+ throw new InvalidOperationException("Already registered an observer.");
}
+ this.readObserver = readObserver;
+ ReceiveMessageAsync();
- call.ServerAccept(cq, callbackHandler);
- call.ServerEndInitialMetadata(0);
- started = true;
+ return finishedServersideTcs.Task;
}
}
- public TaskCompletionSource<object> StartWrite(TWrite msg, bool buffered)
+ public Task SendMessageAsync(TWrite msg)
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- CheckCancelNotRequested();
- if (halfcloseRequested || halfclosed)
+ if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
@@ -219,63 +225,62 @@ namespace Google.GRPC.Core.Internal
// TODO: wrap serialization...
byte[] payload = serializer(msg);
- call.StartWrite(payload, buffered, callbackHandler);
+ call.StartSendMessage(payload, writeFinishedHandler);
writeTcs = new TaskCompletionSource<object>();
- return writeTcs;
+ return writeTcs.Task;
}
}
- // client only
- public void WritesDone()
+ public Task SendCloseFromClientAsync()
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- CheckCancelNotRequested();
- if (halfcloseRequested || halfclosed)
+ if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
- call.WritesDone(callbackHandler);
+ call.StartSendCloseFromClient(halfclosedHandler);
+
halfcloseRequested = true;
+ halfcloseTcs = new TaskCompletionSource<object>();
+ return halfcloseTcs.Task;
}
}
- // server only
- public void WriteStatus(Status status)
+ public Task SendStatusFromServerAsync(Status status)
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- CheckCancelNotRequested();
- if (halfcloseRequested || halfclosed)
+ if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
- call.StartWriteStatus(status, callbackHandler);
+ call.StartSendStatusFromServer(status, halfclosedHandler);
halfcloseRequested = true;
+ halfcloseTcs = new TaskCompletionSource<object>();
+ return halfcloseTcs.Task;
}
}
- public TaskCompletionSource<TRead> StartRead()
+ public Task<TRead> ReceiveMessageAsync()
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- // TODO: add check for not cancelled?
-
- if (doneWithReading)
+ if (readingDone)
{
throw new InvalidOperationException("Already read the last message.");
}
@@ -285,10 +290,10 @@ namespace Google.GRPC.Core.Internal
throw new InvalidOperationException("Only one read can be pending at a time");
}
- call.StartRead(callbackHandler);
+ call.StartReceiveMessage(readFinishedHandler);
readTcs = new TaskCompletionSource<TRead>();
- return readTcs;
+ return readTcs.Task;
}
}
@@ -296,9 +301,8 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
-
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
@@ -309,218 +313,304 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
-
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
- public void OnClientMetadata()
+ private void InitializeInternal(CallSafeHandle call, bool server)
{
- // TODO: implement....
+ lock (myLock)
+ {
+ // Make sure this object and the delegated held by it will not be garbage collected
+ // before we release this handle.
+ gchandle = GCHandle.Alloc(this);
+ this.call = call;
+ this.server = server;
+ }
}
- public void OnRead(byte[] payload)
+ private void CheckStarted()
{
- TaskCompletionSource<TRead> oldTcs = null;
- IObserver<TRead> observer = null;
- lock (myLock)
+ if (!started)
{
- oldTcs = readTcs;
- readTcs = null;
- if (payload == null)
- {
- doneWithReading = true;
- }
- observer = readObserver;
+ throw new InvalidOperationException("Call not started");
}
+ }
- // TODO: wrap deserialization...
- TRead msg = payload != null ? deserializer(payload) : default(TRead);
-
- oldTcs.SetResult(msg);
-
- // TODO: make sure we deliver reads in the right order.
+ private void CheckNotDisposed()
+ {
+ if (disposed)
+ {
+ throw new InvalidOperationException("Call has already been disposed.");
+ }
+ }
- if (observer != null)
+ private void CheckNoError()
+ {
+ if (errorOccured)
{
- if (payload != null)
- {
- // TODO: wrap to handle exceptions
- observer.OnNext(msg);
+ throw new InvalidOperationException("Error occured when processing call.");
+ }
+ }
- // start a new read
- StartRead();
- }
- else
+ private bool ReleaseResourcesIfPossible()
+ {
+ if (!disposed && call != null)
+ {
+ if (halfclosed && readingDone && finished)
{
- // TODO: wrap to handle exceptions;
- observer.OnCompleted();
+ ReleaseResources();
+ return true;
}
+ }
+ return false;
+ }
+ private void ReleaseResources()
+ {
+ if (call != null) {
+ call.Dispose();
}
+ gchandle.Free();
+ disposed = true;
}
- public void OnWriteAccepted(GRPCOpError error)
+ private void CompleteStreamObserver(Status status)
{
- TaskCompletionSource<object> oldTcs = null;
- lock (myLock)
+ if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
- UpdateErrorOccured(error);
- oldTcs = writeTcs;
- writeTcs = null;
+ // TODO: wrap to handle exceptions;
+ readObserver.OnError(new RpcException(status));
+ } else {
+ // TODO: wrap to handle exceptions;
+ readObserver.OnCompleted();
}
+ }
- if (errorOccured)
+ /// <summary>
+ /// Handler for unary response completion.
+ /// </summary>
+ private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
{
- // TODO: use the right type of exception...
- oldTcs.SetException(new Exception("Write failed"));
- }
- else
+ TaskCompletionSource<TRead> tcs;
+ lock(myLock)
+ {
+ finished = true;
+ halfclosed = true;
+ tcs = unaryResponseTcs;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+
+ if (error != GRPCOpError.GRPC_OP_OK)
+ {
+ tcs.SetException(new RpcException(
+ new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.")
+ ));
+ return;
+ }
+
+ var status = ctx.GetReceivedStatus();
+ if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
+ {
+ tcs.SetException(new RpcException(status));
+ return;
+ }
+
+ // TODO: handle deserialize error...
+ var msg = deserializer(ctx.GetReceivedMessage());
+ tcs.SetResult(msg);
+ }
+ catch(Exception e)
{
- // TODO: where does the continuation run?
- oldTcs.SetResult(null);
+ Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- public void OnFinishAccepted(GRPCOpError error)
+ private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
{
- lock (myLock)
+ try
{
- UpdateErrorOccured(error);
- halfclosed = true;
- }
+ TaskCompletionSource<object> oldTcs = null;
+ lock (myLock)
+ {
+ oldTcs = writeTcs;
+ writeTcs = null;
+ }
- if (errorOccured)
- {
- halfcloseTcs.SetException(new Exception("Halfclose failed"));
+ if (errorOccured)
+ {
+ // TODO: use the right type of exception...
+ oldTcs.SetException(new Exception("Write failed"));
+ }
+ else
+ {
+ // TODO: where does the continuation run?
+ oldTcs.SetResult(null);
+ }
}
- else
+ catch(Exception e)
{
- halfcloseTcs.SetResult(null);
+ Console.WriteLine("Caught exception in a native handler: " + e);
}
-
}
- public void OnFinished(Status status)
+ private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
{
- lock (myLock)
+ try
{
- finishedStatus = status;
+ lock (myLock)
+ {
+ halfclosed = true;
- DisposeResourcesIfNeeded();
- }
- finishedTcs.SetResult(status);
+ ReleaseResourcesIfPossible();
+ }
- }
+ if (error != GRPCOpError.GRPC_OP_OK)
+ {
+ halfcloseTcs.SetException(new Exception("Halfclose failed"));
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
+ }
+ else
+ {
+ halfcloseTcs.SetResult(null);
+ }
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
}
- protected virtual void Dispose(bool disposing)
+ private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
{
- if (!disposed)
+ try
{
- if (disposing)
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ var payload = ctx.GetReceivedMessage();
+
+ TaskCompletionSource<TRead> oldTcs = null;
+ IObserver<TRead> observer = null;
+
+ Nullable<Status> status = null;
+
+ lock (myLock)
{
- if (call != null)
+ oldTcs = readTcs;
+ readTcs = null;
+ if (payload == null)
{
- call.Dispose();
+ readingDone = true;
}
+ observer = readObserver;
+ status = finishedStatus;
}
- disposed = true;
- }
- }
- private void UpdateErrorOccured(GRPCOpError error)
- {
- if (error == GRPCOpError.GRPC_OP_ERROR)
- {
- errorOccured = true;
- }
- }
+ // TODO: wrap deserialization...
+ TRead msg = payload != null ? deserializer(payload) : default(TRead);
- private void CheckStarted()
- {
- if (!started)
- {
- throw new InvalidOperationException("Call not started");
- }
- }
+ oldTcs.SetResult(msg);
- private void CheckNoError()
- {
- if (errorOccured)
+ // TODO: make sure we deliver reads in the right order.
+
+ if (observer != null)
+ {
+ if (payload != null)
+ {
+ // TODO: wrap to handle exceptions
+ observer.OnNext(msg);
+
+ // start a new read
+ ReceiveMessageAsync();
+ }
+ else
+ {
+ if (!server)
+ {
+ if (status.HasValue)
+ {
+ CompleteStreamObserver(status.Value);
+ }
+ }
+ else
+ {
+ // TODO: wrap to handle exceptions..
+ observer.OnCompleted();
+ }
+ // TODO: completeStreamObserver serverside...
+ }
+ }
+ }
+ catch(Exception e)
{
- throw new InvalidOperationException("Error occured when processing call.");
+ Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- private void CheckNotFinished()
+ private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
{
- if (finishedStatus.HasValue)
+ try
{
- throw new InvalidOperationException("Already finished.");
- }
- }
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ var status = ctx.GetReceivedStatus();
- private void CheckCancelNotRequested()
- {
- if (cancelRequested)
+ bool wasReadingDone;
+
+ lock (myLock)
+ {
+ finished = true;
+ finishedStatus = status;
+
+ wasReadingDone = readingDone;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ if (wasReadingDone) {
+ CompleteStreamObserver(status);
+ }
+
+ }
+ catch(Exception e)
{
- throw new InvalidOperationException("Cancel has been requested.");
+ Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- private void DisposeResourcesIfNeeded()
+ private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
{
- if (call != null && started && finishedStatus.HasValue)
+ try
{
- // TODO: should we also wait for all the pending events to finish?
-
- call.Dispose();
- }
- }
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- private void HandleEvent(IntPtr eventPtr) {
- try {
- var ev = new EventSafeHandleNotOwned(eventPtr);
- switch (ev.GetCompletionType())
+ lock(myLock)
{
- case GRPCCompletionType.GRPC_CLIENT_METADATA_READ:
- OnClientMetadata();
- break;
-
- case GRPCCompletionType.GRPC_READ:
- byte[] payload = ev.GetReadData();
- OnRead(payload);
- break;
+ finished = true;
- case GRPCCompletionType.GRPC_WRITE_ACCEPTED:
- OnWriteAccepted(ev.GetWriteAccepted());
- break;
+ // TODO: because of the way server calls are implemented, we need to set
+ // reading done to true here. Should be fixed in the future.
+ readingDone = true;
- case GRPCCompletionType.GRPC_FINISH_ACCEPTED:
- OnFinishAccepted(ev.GetFinishAccepted());
- break;
+ ReleaseResourcesIfPossible();
+ }
+ // TODO: handle error ...
- case GRPCCompletionType.GRPC_FINISHED:
- OnFinished(ev.GetFinished());
- break;
+ finishedServersideTcs.SetResult(null);
- default:
- throw new ArgumentException("Unexpected completion type");
- }
- } catch(Exception e) {
+ }
+ catch(Exception e)
+ {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
}
-}
+} \ No newline at end of file
diff --git a/src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs
new file mode 100644
index 0000000000..ddfd94a3b5
--- /dev/null
+++ b/src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs
@@ -0,0 +1,96 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using Google.GRPC.Core;
+
+namespace Google.GRPC.Core.Internal
+{
+ /// <summary>
+ /// Not owned version of
+ /// grpcsharp_batch_context
+ /// </summary>
+ internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
+ {
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
+
+ public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
+ {
+ SetHandle(handle);
+ }
+
+ public Status GetReceivedStatus()
+ {
+ // TODO: can the native method return string directly?
+ string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this));
+ return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details);
+ }
+
+ public byte[] GetReceivedMessage()
+ {
+ IntPtr len = grpcsharp_batch_context_recv_message_length(this);
+ if (len == new IntPtr(-1))
+ {
+ return null;
+ }
+ byte[] data = new byte[(int) len];
+ grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length));
+ return data;
+ }
+
+ public CallSafeHandle GetServerRpcNewCall() {
+ return grpcsharp_batch_context_server_rpc_new_call(this);
+ }
+
+ public string GetServerRpcNewMethod() {
+ return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
+ }
+ }
+} \ No newline at end of file
diff --git a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs
index e9ccd8d5f9..55d66a62ca 100644
--- a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs
+++ b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs
@@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
-//
+//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
-//
+//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
-//
+//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@@ -38,8 +38,8 @@ using Google.GRPC.Core;
namespace Google.GRPC.Core.Internal
{
- // TODO: we need to make sure that the delegates are not collected before invoked.
- internal delegate void EventCallbackDelegate(IntPtr eventPtr);
+ //TODO: rename the delegate
+ internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
@@ -49,142 +49,108 @@ namespace Google.GRPC.Core.Internal
const UInt32 GRPC_WRITE_BUFFER_HINT = 1;
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_channel_create_call_old(ChannelSafeHandle channel, string method, string host, Timespec deadline);
+ static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_add_metadata(CallSafeHandle call, IntPtr metadata, UInt32 flags);
+ static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_invoke_old(CallSafeHandle call, CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, UInt32 flags);
-
- [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_invoke_old")]
- static extern GRPCCallError grpcsharp_call_invoke_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle cq,
- [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate metadataReadCallback,
- [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback,
- UInt32 flags);
+ static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_server_accept_old(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, IntPtr finishedTag);
-
- [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_server_accept_old")]
- static extern GRPCCallError grpcsharp_call_server_accept_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback);
+ static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_server_end_initial_metadata_old(CallSafeHandle call, UInt32 flags);
+ static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
+ static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
+ static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_start_write_status_old(CallSafeHandle call, StatusCode statusCode, string statusMessage, IntPtr tag);
-
- [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_status_old")]
- static extern GRPCCallError grpcsharp_call_start_write_status_old_CALLBACK(CallSafeHandle call, StatusCode statusCode, string statusMessage, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_writes_done_old(CallSafeHandle call, IntPtr tag);
-
- [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_writes_done_old")]
- static extern GRPCCallError grpcsharp_call_writes_done_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_start_read_old(CallSafeHandle call, IntPtr tag);
-
- [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_read_old")]
- static extern GRPCCallError grpcsharp_call_start_read_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_call_start_write_from_copied_buffer(CallSafeHandle call,
- byte[] buffer, UIntPtr length,
- IntPtr tag, UInt32 flags);
+ static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
- [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_from_copied_buffer")]
- static extern void grpcsharp_call_start_write_from_copied_buffer_CALLBACK(CallSafeHandle call,
- byte[] buffer, UIntPtr length,
- [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback,
- UInt32 flags);
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
- [DllImport("grpc_csharp_ext.dll")]
+ [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
- private CallSafeHandle()
- {
- }
-
- /// <summary>
- /// Creates a client call.
- /// </summary>
- public static CallSafeHandle Create(ChannelSafeHandle channel, string method, string host, Timespec deadline)
- {
- return grpcsharp_channel_create_call_old(channel, method, host, deadline);
- }
-
- public void Invoke(CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, bool buffered)
- {
- AssertCallOk(grpcsharp_call_invoke_old(this, cq, metadataReadTag, finishedTag, GetFlags(buffered)));
- }
-
- public void Invoke(CompletionQueueSafeHandle cq, bool buffered, EventCallbackDelegate metadataReadCallback, EventCallbackDelegate finishedCallback)
- {
- AssertCallOk(grpcsharp_call_invoke_old_CALLBACK(this, cq, metadataReadCallback, finishedCallback, GetFlags(buffered)));
- }
- public void ServerAccept(CompletionQueueSafeHandle cq, IntPtr finishedTag)
+ private CallSafeHandle()
{
- AssertCallOk(grpcsharp_call_server_accept_old(this, cq, finishedTag));
}
- public void ServerAccept(CompletionQueueSafeHandle cq, EventCallbackDelegate callback)
+ public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- AssertCallOk(grpcsharp_call_server_accept_old_CALLBACK(this, cq, callback));
+ return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
- public void ServerEndInitialMetadata(UInt32 flags)
+ public void StartUnary(byte[] payload, CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_server_end_initial_metadata_old(this, flags));
+ AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
- public void StartWrite(byte[] payload, IntPtr tag, bool buffered)
+ public void StartClientStreaming(CompletionCallbackDelegate callback)
{
- grpcsharp_call_start_write_from_copied_buffer(this, payload, new UIntPtr((ulong) payload.Length), tag, GetFlags(buffered));
+ AssertCallOk(grpcsharp_call_start_client_streaming(this, callback));
}
- public void StartWrite(byte[] payload, bool buffered, EventCallbackDelegate callback)
+ public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback)
{
- grpcsharp_call_start_write_from_copied_buffer_CALLBACK(this, payload, new UIntPtr((ulong) payload.Length), callback, GetFlags(buffered));
+ AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
- public void StartWriteStatus(Status status, IntPtr tag)
+ public void StartDuplexStreaming(CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_write_status_old(this, status.StatusCode, status.Detail, tag));
+ AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback));
}
- public void StartWriteStatus(Status status, EventCallbackDelegate callback)
+ public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_write_status_old_CALLBACK(this, status.StatusCode, status.Detail, callback));
+ AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
- public void WritesDone(IntPtr tag)
+ public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_writes_done_old(this, tag));
+ AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
}
- public void WritesDone(EventCallbackDelegate callback)
+ public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_writes_done_old_CALLBACK(this, callback));
+ AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
}
- public void StartRead(IntPtr tag)
+ public void StartReceiveMessage(CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_read_old(this, tag));
+ AssertCallOk(grpcsharp_call_recv_message(this, callback));
}
- public void StartRead(EventCallbackDelegate callback)
+ public void StartServerSide(CompletionCallbackDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_read_old_CALLBACK(this, callback));
+ AssertCallOk(grpcsharp_call_start_serverside(this, callback));
}
public void Cancel()
@@ -212,4 +178,4 @@ namespace Google.GRPC.Core.Internal
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
}
-}
+} \ No newline at end of file
diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs
index 60837de5e6..4d10a9bdf9 100644
--- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
+++ b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs
@@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
-//
+//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
-//
+//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
-//
+//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@@ -36,19 +36,20 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
- internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
+ internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
- public StreamingInputObserver(AsyncCall<TWrite, TRead> call)
+ public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
public void OnCompleted()
{
+
// TODO: how bad is the Wait here?
- call.WritesCompletedAsync().Wait();
+ call.SendCloseFromClientAsync().Wait();
}
public void OnError(Exception error)
@@ -59,7 +60,7 @@ namespace Google.GRPC.Core.Internal
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
- call.WriteAsync(value).Wait();
+ call.SendMessageAsync(value).Wait();
}
}
}
diff --git a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs
index 666f220b8c..5ea436df19 100644
--- a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs
@@ -46,12 +46,6 @@ namespace Google.GRPC.Core.Internal
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();
[DllImport("grpc_csharp_ext.dll")]
- static extern EventSafeHandle grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag, Timespec deadline);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern EventSafeHandle grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq, Timespec deadline);
-
- [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
@@ -69,21 +63,11 @@ namespace Google.GRPC.Core.Internal
return grpcsharp_completion_queue_create();
}
- public EventSafeHandle Next(Timespec deadline)
- {
- return grpcsharp_completion_queue_next(this, deadline);
- }
-
public GRPCCompletionType NextWithCallback()
{
return grpcsharp_completion_queue_next_with_callback(this);
}
- public EventSafeHandle Pluck(IntPtr tag, Timespec deadline)
- {
- return grpcsharp_completion_queue_pluck(this, tag, deadline);
- }
-
public void Shutdown()
{
grpcsharp_completion_queue_shutdown(this);
diff --git a/src/csharp/GrpcCore/Internal/Event.cs b/src/csharp/GrpcCore/Internal/Event.cs
deleted file mode 100644
index 6116e0975a..0000000000
--- a/src/csharp/GrpcCore/Internal/Event.cs
+++ /dev/null
@@ -1,224 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-#endregion
-
-using System;
-using System.Runtime.InteropServices;
-using Google.GRPC.Core;
-
-namespace Google.GRPC.Core.Internal
-{
- /// <summary>
- /// grpc_event from grpc/grpc.h
- /// </summary>
- internal class EventSafeHandle : SafeHandleZeroIsInvalid
- {
- [DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_event_finish(IntPtr ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandle ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_event_call(EventSafeHandle ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandle ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandle ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern StatusCode grpcsharp_event_finished_status(EventSafeHandle ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_event_finished_details(EventSafeHandle ev); // returns const char*
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_event_read_length(EventSafeHandle ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandle ev, byte[] buffer, UIntPtr bufferLen);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandle ev); // returns const char*
-
- public GRPCCompletionType GetCompletionType()
- {
- return grpcsharp_event_type(this);
- }
-
- public GRPCOpError GetWriteAccepted()
- {
- return grpcsharp_event_write_accepted(this);
- }
-
- public GRPCOpError GetFinishAccepted()
- {
- return grpcsharp_event_finish_accepted(this);
- }
-
- public Status GetFinished()
- {
- // TODO: can the native method return string directly?
- string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this));
- return new Status(grpcsharp_event_finished_status(this), details);
- }
-
- public byte[] GetReadData()
- {
- IntPtr len = grpcsharp_event_read_length(this);
- if (len == new IntPtr(-1))
- {
- return null;
- }
- byte[] data = new byte[(int) len];
- grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length));
- return data;
- }
-
- public CallSafeHandle GetCall() {
- return grpcsharp_event_call(this);
- }
-
- public string GetServerRpcNewMethod() {
- // TODO: can the native method return string directly?
- return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this));
- }
-
- //TODO: client_metadata_read event type
-
- protected override bool ReleaseHandle()
- {
- grpcsharp_event_finish(handle);
- return true;
- }
- }
-
- // TODO: this is basically c&p of EventSafeHandle. Unify!
- /// <summary>
- /// Not owned version of
- /// grpc_event from grpc/grpc.h
- /// </summary>
- internal class EventSafeHandleNotOwned : SafeHandleZeroIsInvalid
- {
- [DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_event_finish(IntPtr ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandleNotOwned ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_event_call(EventSafeHandleNotOwned ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandleNotOwned ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandleNotOwned ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern StatusCode grpcsharp_event_finished_status(EventSafeHandleNotOwned ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_event_finished_details(EventSafeHandleNotOwned ev); // returns const char*
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_event_read_length(EventSafeHandleNotOwned ev);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandleNotOwned ev, byte[] buffer, UIntPtr bufferLen);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandleNotOwned ev); // returns const char*
-
- public EventSafeHandleNotOwned() : base(false)
- {
- }
-
- public EventSafeHandleNotOwned(IntPtr handle) : base(false)
- {
- SetHandle(handle);
- }
-
- public GRPCCompletionType GetCompletionType()
- {
- return grpcsharp_event_type(this);
- }
-
- public GRPCOpError GetWriteAccepted()
- {
- return grpcsharp_event_write_accepted(this);
- }
-
- public GRPCOpError GetFinishAccepted()
- {
- return grpcsharp_event_finish_accepted(this);
- }
-
- public Status GetFinished()
- {
- // TODO: can the native method return string directly?
- string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this));
- return new Status(grpcsharp_event_finished_status(this), details);
- }
-
- public byte[] GetReadData()
- {
- IntPtr len = grpcsharp_event_read_length(this);
- if (len == new IntPtr(-1))
- {
- return null;
- }
- byte[] data = new byte[(int) len];
- grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length));
- return data;
- }
-
- public CallSafeHandle GetCall() {
- return grpcsharp_event_call(this);
- }
-
- public string GetServerRpcNewMethod() {
- // TODO: can the native method return string directly?
- return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this));
- }
-
- //TODO: client_metadata_read event type
-
- protected override bool ReleaseHandle()
- {
- grpcsharp_event_finish(handle);
- return true;
- }
- }
-}
diff --git a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs
index f8154fa250..634a0b2d72 100644
--- a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs
+++ b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs
@@ -48,7 +48,6 @@ namespace Google.GRPC.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
- readonly Action<EventSafeHandle> eventHandler;
CompletionQueueSafeHandle cq;
@@ -56,11 +55,6 @@ namespace Google.GRPC.Core.Internal
this.poolSize = poolSize;
}
- internal GrpcThreadPool(int poolSize, Action<EventSafeHandle> eventHandler) {
- this.poolSize = poolSize;
- this.eventHandler = eventHandler;
- }
-
public void Start() {
lock (myLock)
@@ -104,34 +98,19 @@ namespace Google.GRPC.Core.Internal
}
}
- private Thread CreateAndStartThread(int i) {
- Action body;
- if (eventHandler != null)
- {
- body = ThreadBodyWithHandler;
- }
- else
- {
- body = ThreadBodyNoHandler;
- }
- var thread = new Thread(new ThreadStart(body));
+ private Thread CreateAndStartThread(int i)
+ {
+ var thread = new Thread(new ThreadStart(RunHandlerLoop));
thread.IsBackground = false;
thread.Start();
- if (eventHandler != null)
- {
- thread.Name = "grpc_server_newrpc " + i;
- }
- else
- {
- thread.Name = "grpc " + i;
- }
+ thread.Name = "grpc " + i;
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
- private void ThreadBodyNoHandler()
+ private void RunHandlerLoop()
{
GRPCCompletionType completionType;
do
@@ -140,22 +119,6 @@ namespace Google.GRPC.Core.Internal
} while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
-
- /// <summary>
- /// Body of the polling thread.
- /// </summary>
- private void ThreadBodyWithHandler()
- {
- GRPCCompletionType completionType;
- do
- {
- using (EventSafeHandle ev = cq.Next(Timespec.InfFuture)) {
- completionType = ev.GetCompletionType();
- eventHandler(ev);
- }
- } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
- Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
- }
}
}
diff --git a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs
index 74a8ef7b6e..59f08d4ca8 100644
--- a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs
+++ b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs
@@ -56,6 +56,12 @@ namespace Google.GRPC.Core.Internal
return handle == IntPtr.Zero;
}
}
+
+ protected override bool ReleaseHandle()
+ {
+ // handle is not owned.
+ return true;
+ }
}
}
diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
index c91de97ce3..047bde1add 100644
--- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
+++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
@@ -38,24 +38,22 @@ using System.Collections.Concurrent;
namespace Google.GRPC.Core.Internal
{
+ // TODO: we need to make sure that the delegates are not collected before invoked.
+ internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr);
+
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
- [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_request_call_old")]
- static extern GRPCCallError grpcsharp_server_request_call_old_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
-
[DllImport("grpc_csharp_ext.dll")]
- static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
+ static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
- // TODO: check int representation size
[DllImport("grpc_csharp_ext.dll")]
- static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
+ static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
- // TODO: check int representation size
[DllImport("grpc_csharp_ext.dll")]
- static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr);
+ static extern Int32 grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_start(ServerSafeHandle server);
@@ -63,8 +61,9 @@ namespace Google.GRPC.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown(ServerSafeHandle server);
+ // TODO: get rid of the old callback style
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")]
- static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
+ static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
@@ -81,7 +80,6 @@ namespace Google.GRPC.Core.Internal
public int AddPort(string addr)
{
- // TODO: also grpc_server_add_secure_http2_port...
return grpcsharp_server_add_http2_port(this, addr);
}
@@ -95,14 +93,14 @@ namespace Google.GRPC.Core.Internal
grpcsharp_server_shutdown(this);
}
- public void ShutdownAndNotify(EventCallbackDelegate callback)
+ public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback)
{
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
- public GRPCCallError RequestCall(EventCallbackDelegate callback)
+ public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{
- return grpcsharp_server_request_call_old_CALLBACK(this, callback);
+ return grpcsharp_server_request_call(this, cq, callback);
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs
index 1d29864b9f..e9cb65cb3b 100644
--- a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs
+++ b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs
@@ -40,11 +40,11 @@ namespace Google.GRPC.Core.Internal
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
- internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite>
+ internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
- public ServerWritingObserver(AsyncCall<TWrite, TRead> call)
+ public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
@@ -52,19 +52,19 @@ namespace Google.GRPC.Core.Internal
public void OnCompleted()
{
// TODO: how bad is the Wait here?
- call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
+ call.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
}
public void OnError(Exception error)
{
- // TODO: handle this...
+ // TODO: implement this...
throw new InvalidOperationException("This should never be called.");
}
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
- call.WriteAsync(value).Wait();
+ call.SendMessageAsync(value).Wait();
}
}
}
diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs
index 0882a61299..91842d8182 100644
--- a/src/csharp/GrpcCore/Server.cs
+++ b/src/csharp/GrpcCore/Server.cs
@@ -49,8 +49,8 @@ namespace Google.GRPC.Core
{
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
- readonly EventCallbackDelegate newRpcHandler;
- readonly EventCallbackDelegate serverShutdownHandler;
+ readonly ServerShutdownCallbackDelegate serverShutdownHandler;
+ readonly CompletionCallbackDelegate newServerRpcHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
@@ -61,9 +61,8 @@ namespace Google.GRPC.Core
public Server()
{
- // TODO: what is the tag for server shutdown?
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
- this.newRpcHandler = HandleNewRpc;
+ this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
}
@@ -99,7 +98,7 @@ namespace Google.GRPC.Core
{
var rpcInfo = newRpcQueue.Take();
- Console.WriteLine("Server received RPC " + rpcInfo.Method);
+ //Console.WriteLine("Server received RPC " + rpcInfo.Method);
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
@@ -138,23 +137,25 @@ namespace Google.GRPC.Core
private void AllowOneRpc()
{
- AssertCallOk(handle.RequestCall(newRpcHandler));
+ AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
}
- private void HandleNewRpc(IntPtr eventPtr)
- {
- try
- {
- var ev = new EventSafeHandleNotOwned(eventPtr);
- var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod());
+ private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) {
+ try {
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+
+ if (error != GRPCOpError.GRPC_OP_OK) {
+ // TODO: handle error
+ }
+
+ var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
// after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid) {
newRpcQueue.Add(rpcInfo);
}
- }
- catch (Exception e)
- {
+
+ } catch(Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs
index bcce4a091f..48d1eaa335 100644
--- a/src/csharp/GrpcCore/ServerCallHandler.cs
+++ b/src/csharp/GrpcCore/ServerCallHandler.cs
@@ -59,15 +59,16 @@ namespace Google.GRPC.Core
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
- asyncCall.Accept(cq);
+
+ var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
- var request = asyncCall.ReadAsync().Result;
+ var request = asyncCall.ReceiveMessageAsync().Result;
- var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
+ var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
handler(request, responseObserver);
- asyncCall.Halfclosed.Wait();
- asyncCall.Finished.Wait();
+ finishedTask.Wait();
+
}
}
@@ -89,16 +90,11 @@ namespace Google.GRPC.Core
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
- asyncCall.Accept(cq);
- var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
+ var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
var requestObserver = handler(responseObserver);
-
- // feed the requests
- asyncCall.StartReadingToStream(requestObserver);
-
- asyncCall.Halfclosed.Wait();
- asyncCall.Finished.Wait();
+ var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
+ finishedTask.Wait();
}
}
@@ -110,12 +106,31 @@ namespace Google.GRPC.Core
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
(payload) => payload, (payload) => payload);
+
asyncCall.InitializeServer(call);
- asyncCall.Accept(cq);
- asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
- asyncCall.Finished.Wait();
+ var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
+
+ asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
+
+ finishedTask.Wait();
+ }
+ }
+
+ internal class NullObserver<T> : IObserver<T>
+ {
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
}
+
+ public void OnNext(T value)
+ {
+ }
+
}
}
diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs
index 4401156520..ba43e4f6a0 100644
--- a/src/csharp/GrpcCoreTests/ClientServerTest.cs
+++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs
@@ -36,6 +36,7 @@ using NUnit.Framework;
using Google.GRPC.Core;
using Google.GRPC.Core.Internal;
using System.Threading;
+using System.Diagnostics;
using System.Threading.Tasks;
using Google.GRPC.Core.Utils;
@@ -51,11 +52,21 @@ namespace Google.GRPC.Core.Tests
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
- [Test]
- public void EmptyCall()
+ [TestFixtureSetUp]
+ public void Init()
{
GrpcEnvironment.Initialize();
+ }
+
+ [TestFixtureTearDown]
+ public void Cleanup()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+ [Test]
+ public void UnaryCall()
+ {
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
@@ -69,19 +80,71 @@ namespace Google.GRPC.Core.Tests
var call = new Call<string, string>(unaryEchoStringMethod, channel);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
+
Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken)));
}
server.ShutdownAsync().Wait();
+ }
- GrpcEnvironment.Shutdown();
+ [Test]
+ public void UnaryCallPerformance()
+ {
+ Server server = new Server();
+ server.AddServiceDefinition(
+ ServerServiceDefinition.CreateBuilder("someService")
+ .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
+
+ int port = server.AddPort(host + ":0");
+ server.Start();
+
+ using (Channel channel = new Channel(host + ":" + port))
+ {
+ var call = new Call<string, string>(unaryEchoStringMethod, channel);
+
+ var stopwatch = new Stopwatch();
+ stopwatch.Start();
+ for (int i = 0; i < 1000; i++)
+ {
+ Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
+ }
+ stopwatch.Stop();
+ Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
+ }
+
+ server.ShutdownAsync().Wait();
}
- private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {
+ [Test]
+ public void UnknownMethodHandler()
+ {
+ Server server = new Server();
+ server.AddServiceDefinition(
+ ServerServiceDefinition.CreateBuilder("someService").Build());
+
+ int port = server.AddPort(host + ":0");
+ server.Start();
+
+ using (Channel channel = new Channel(host + ":" + port))
+ {
+ var call = new Call<string, string>(unaryEchoStringMethod, channel);
+
+ try {
+ Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
+ Assert.Fail();
+ } catch(RpcException e) {
+ Assert.AreEqual(StatusCode.GRPC_STATUS_UNIMPLEMENTED, e.Status.StatusCode);
+ }
+ }
+
+ server.ShutdownAsync().Wait();
+ }
+
+ private void HandleUnaryEchoString(string request, IObserver<string> responseObserver)
+ {
responseObserver.OnNext(request);
responseObserver.OnCompleted();
}
-
}
}
diff --git a/src/csharp/README.md b/src/csharp/README.md
index a16f1e719e..f56ddabda5 100755
--- a/src/csharp/README.md
+++ b/src/csharp/README.md
@@ -25,10 +25,11 @@ INSTALLATION AND USAGE: WINDOWS
INSTALLATION AND USAGE: LINUX & MONO
------------------------------------
-- Compile and install the gRPC C Core library
+- Compile and install the gRPC C# extension library (that will be used via
+ P/Invoke from C#).
```
-make shared_c
-sudo make install
+make grpc_csharp_ext
+sudo make install_grpc_csharp_ext
```
- Prerequisites for development: Mono framework, MonoDevelop (IDE)
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index c7949af44e..304ee9cf34 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -32,9 +32,11 @@
*/
#include <grpc/support/port_platform.h>
+#include <grpc/support/alloc.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
+#include <grpc/support/string.h>
#include <string.h>
@@ -58,6 +60,139 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
return bb;
}
+typedef void(GPR_CALLTYPE *callback_funcptr)(grpc_op_error op_error,
+ void *batch_context);
+
+/*
+ * Helper to maintain lifetime of batch op inputs and store batch op outputs.
+ */
+typedef struct gprcsharp_batch_context {
+ grpc_metadata_array send_initial_metadata;
+ grpc_byte_buffer *send_message;
+ struct {
+ grpc_metadata_array trailing_metadata;
+ char *status_details;
+ } send_status_from_server;
+ grpc_metadata_array recv_initial_metadata;
+ grpc_byte_buffer *recv_message;
+ struct {
+ grpc_metadata_array trailing_metadata;
+ grpc_status_code status;
+ char *status_details;
+ size_t status_details_capacity;
+ } recv_status_on_client;
+ int recv_close_on_server_cancelled;
+ struct {
+ grpc_call *call;
+ grpc_call_details call_details;
+ grpc_metadata_array request_metadata;
+ } server_rpc_new;
+
+ /* callback will be called upon completion */
+ callback_funcptr callback;
+
+} grpcsharp_batch_context;
+
+grpcsharp_batch_context *grpcsharp_batch_context_create() {
+ grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context));
+ memset(ctx, 0, sizeof(grpcsharp_batch_context));
+ return ctx;
+}
+
+/**
+ * Destroys metadata array including keys and values.
+ */
+void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) {
+ if (!array->metadata) {
+ return;
+ }
+ /* TODO: destroy also keys and values */
+ grpc_metadata_array_destroy(array);
+}
+
+void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
+ if (!ctx) {
+ return;
+ }
+ grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata));
+
+ grpc_byte_buffer_destroy(ctx->send_message);
+
+ grpcsharp_metadata_array_destroy_recursive(
+ &(ctx->send_status_from_server.trailing_metadata));
+ gpr_free(ctx->send_status_from_server.status_details);
+
+ grpc_metadata_array_destroy(&(ctx->recv_initial_metadata));
+
+ grpc_byte_buffer_destroy(ctx->recv_message);
+
+ grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata));
+ gpr_free((void *)ctx->recv_status_on_client.status_details);
+
+ /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
+ supposed
+ to take its ownership. */
+
+ grpc_call_details_destroy(&(ctx->server_rpc_new.call_details));
+ grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata));
+
+ gpr_free(ctx);
+}
+
+GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length(
+ const grpcsharp_batch_context *ctx) {
+ if (!ctx->recv_message) {
+ return -1;
+ }
+ return grpc_byte_buffer_length(ctx->recv_message);
+}
+
+/*
+ * Copies data from recv_message to a buffer. Fatal error occurs if
+ * buffer is too small.
+ */
+GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer(
+ const grpcsharp_batch_context *ctx, char *buffer, size_t buffer_len) {
+ grpc_byte_buffer_reader *reader;
+ gpr_slice slice;
+ size_t offset = 0;
+
+ reader = grpc_byte_buffer_reader_create(ctx->recv_message);
+
+ while (grpc_byte_buffer_reader_next(reader, &slice)) {
+ size_t len = GPR_SLICE_LENGTH(slice);
+ GPR_ASSERT(offset + len <= buffer_len);
+ memcpy(buffer + offset, GPR_SLICE_START_PTR(slice),
+ GPR_SLICE_LENGTH(slice));
+ offset += len;
+ gpr_slice_unref(slice);
+ }
+ grpc_byte_buffer_reader_destroy(reader);
+}
+
+GPR_EXPORT grpc_status_code GPR_CALLTYPE
+grpcsharp_batch_context_recv_status_on_client_status(
+ const grpcsharp_batch_context *ctx) {
+ return ctx->recv_status_on_client.status;
+}
+
+GPR_EXPORT const char *GPR_CALLTYPE
+grpcsharp_batch_context_recv_status_on_client_details(
+ const grpcsharp_batch_context *ctx) {
+ return ctx->recv_status_on_client.status_details;
+}
+
+GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call(
+ const grpcsharp_batch_context *ctx) {
+ return ctx->server_rpc_new.call;
+}
+
+GPR_EXPORT const char *GPR_CALLTYPE
+grpcsharp_batch_context_server_rpc_new_method(
+ const grpcsharp_batch_context *ctx) {
+ return ctx->server_rpc_new.call_details.method;
+}
+
/* Init & shutdown */
GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); }
@@ -71,18 +206,6 @@ grpcsharp_completion_queue_create(void) {
return grpc_completion_queue_create();
}
-GPR_EXPORT grpc_event *GPR_CALLTYPE
-grpcsharp_completion_queue_next(grpc_completion_queue *cq,
- gpr_timespec deadline) {
- return grpc_completion_queue_next(cq, deadline);
-}
-
-GPR_EXPORT grpc_event *GPR_CALLTYPE
-grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline) {
- return grpc_completion_queue_pluck(cq, tag, deadline);
-}
-
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_completion_queue_shutdown(grpc_completion_queue *cq) {
grpc_completion_queue_shutdown(cq);
@@ -96,12 +219,18 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
grpc_event *ev;
+ grpcsharp_batch_context *batch_context;
grpc_completion_type t;
void(GPR_CALLTYPE * callback)(grpc_event *);
ev = grpc_completion_queue_next(cq, gpr_inf_future);
t = ev->type;
- if (ev->tag) {
+ if (t == GRPC_OP_COMPLETE && ev->tag) {
+ /* NEW API handler */
+ batch_context = (grpcsharp_batch_context *)ev->tag;
+ batch_context->callback(ev->data.op_complete, batch_context);
+ grpcsharp_batch_context_destroy(batch_context);
+ } else if (ev->tag) {
/* call the callback in ev->tag */
/* C forbids to cast object pointers to function pointers, so
* we cast to intptr first.
@@ -129,204 +258,286 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
}
GPR_EXPORT grpc_call *GPR_CALLTYPE
-grpcsharp_channel_create_call_old(grpc_channel *channel, const char *method,
- const char *host, gpr_timespec deadline) {
- return grpc_channel_create_call_old(channel, method, host, deadline);
+grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
+ const char *method, const char *host,
+ gpr_timespec deadline) {
+ return grpc_channel_create_call(channel, cq, method, host, deadline);
}
-/* Event */
+/* Timespec */
-GPR_EXPORT void GPR_CALLTYPE grpcsharp_event_finish(grpc_event *event) {
- grpc_event_finish(event);
-}
+GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); }
-GPR_EXPORT grpc_completion_type GPR_CALLTYPE
-grpcsharp_event_type(const grpc_event *event) {
- return event->type;
+GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) {
+ return gpr_inf_future;
}
-GPR_EXPORT grpc_op_error GPR_CALLTYPE
-grpcsharp_event_write_accepted(const grpc_event *event) {
- GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED);
- return event->data.invoke_accepted;
+GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {
+ return sizeof(gpr_timespec);
}
-GPR_EXPORT grpc_op_error GPR_CALLTYPE
-grpcsharp_event_finish_accepted(const grpc_event *event) {
- GPR_ASSERT(event->type == GRPC_FINISH_ACCEPTED);
- return event->data.finish_accepted;
-}
+/* Call */
-GPR_EXPORT grpc_status_code GPR_CALLTYPE
-grpcsharp_event_finished_status(const grpc_event *event) {
- GPR_ASSERT(event->type == GRPC_FINISHED);
- return event->data.finished.status;
+GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) {
+ return grpc_call_cancel(call);
}
-GPR_EXPORT const char *GPR_CALLTYPE
-grpcsharp_event_finished_details(const grpc_event *event) {
- GPR_ASSERT(event->type == GRPC_FINISHED);
- return event->data.finished.details;
+GPR_EXPORT grpc_call_error GPR_CALLTYPE
+grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status,
+ const char *description) {
+ return grpc_call_cancel_with_status(call, status, description);
}
-GPR_EXPORT gpr_intptr GPR_CALLTYPE
-grpcsharp_event_read_length(const grpc_event *event) {
- GPR_ASSERT(event->type == GRPC_READ);
- if (!event->data.read) {
- return -1;
- }
- return grpc_byte_buffer_length(event->data.read);
+GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
+ grpc_call_destroy(call);
}
-/*
- * Copies data from read event to a buffer. Fatal error occurs if
- * buffer is too small.
- */
GPR_EXPORT void GPR_CALLTYPE
-grpcsharp_event_read_copy_to_buffer(const grpc_event *event, char *buffer,
- size_t buffer_len) {
- grpc_byte_buffer_reader *reader;
- gpr_slice slice;
- size_t offset = 0;
+grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
+ const char *buffer, size_t len,
+ void *tag, gpr_uint32 flags) {
+ grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len);
+ GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) ==
+ GRPC_CALL_OK);
+ grpc_byte_buffer_destroy(byte_buffer);
+}
- GPR_ASSERT(event->type == GRPC_READ);
- reader = grpc_byte_buffer_reader_create(event->data.read);
+GPR_EXPORT grpc_call_error GPR_CALLTYPE
+grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
+ const char *send_buffer, size_t send_buffer_len) {
+ /* TODO: don't use magic number */
+ grpc_op ops[6];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
- GPR_ASSERT(event->data.read);
- while (grpc_byte_buffer_reader_next(reader, &slice)) {
- size_t len = GPR_SLICE_LENGTH(slice);
- GPR_ASSERT(offset + len <= buffer_len);
- memcpy(buffer + offset, GPR_SLICE_START_PTR(slice),
- GPR_SLICE_LENGTH(slice));
- offset += len;
- gpr_slice_unref(slice);
- }
- grpc_byte_buffer_reader_destroy(reader);
-}
+ /* TODO: implement sending the metadata... */
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ /* ctx->send_initial_metadata is already zeroed out. */
+ ops[0].data.send_initial_metadata.count = 0;
+ ops[0].data.send_initial_metadata.metadata = NULL;
-GPR_EXPORT grpc_call *GPR_CALLTYPE
-grpcsharp_event_call(const grpc_event *event) {
- /* we only allow this for newly incoming server calls. */
- GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW);
- return event->call;
-}
+ ops[1].op = GRPC_OP_SEND_MESSAGE;
+ ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
+ ops[1].data.send_message = ctx->send_message;
-GPR_EXPORT const char *GPR_CALLTYPE
-grpcsharp_event_server_rpc_new_method(const grpc_event *event) {
- GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW);
- return event->data.server_rpc_new.method;
-}
+ ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
-/* Timespec */
+ ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
+ ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
-GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); }
+ ops[4].op = GRPC_OP_RECV_MESSAGE;
+ ops[4].data.recv_message = &(ctx->recv_message);
-GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) {
- return gpr_inf_future;
-}
+ ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[5].data.recv_status_on_client.trailing_metadata =
+ &(ctx->recv_status_on_client.trailing_metadata);
+ ops[5].data.recv_status_on_client.status =
+ &(ctx->recv_status_on_client.status);
+ /* not using preallocation for status_details */
+ ops[5].data.recv_status_on_client.status_details =
+ &(ctx->recv_status_on_client.status_details);
+ ops[5].data.recv_status_on_client.status_details_capacity =
+ &(ctx->recv_status_on_client.status_details_capacity);
-GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {
- return sizeof(gpr_timespec);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
-/* Call */
-
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_add_metadata_old(grpc_call *call, grpc_metadata *metadata,
- gpr_uint32 flags) {
- return grpc_call_add_metadata_old(call, metadata, flags);
+grpcsharp_call_start_client_streaming(grpc_call *call,
+ callback_funcptr callback) {
+ /* TODO: don't use magic number */
+ grpc_op ops[4];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
+
+ /* TODO: implement sending the metadata... */
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ /* ctx->send_initial_metadata is already zeroed out. */
+ ops[0].data.send_initial_metadata.count = 0;
+ ops[0].data.send_initial_metadata.metadata = NULL;
+
+ ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
+ ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+
+ ops[2].op = GRPC_OP_RECV_MESSAGE;
+ ops[2].data.recv_message = &(ctx->recv_message);
+
+ ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[3].data.recv_status_on_client.trailing_metadata =
+ &(ctx->recv_status_on_client.trailing_metadata);
+ ops[3].data.recv_status_on_client.status =
+ &(ctx->recv_status_on_client.status);
+ /* not using preallocation for status_details */
+ ops[3].data.recv_status_on_client.status_details =
+ &(ctx->recv_status_on_client.status_details);
+ ops[3].data.recv_status_on_client.status_details_capacity =
+ &(ctx->recv_status_on_client.status_details_capacity);
+
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
- void *metadata_read_tag, void *finished_tag,
- gpr_uint32 flags) {
- return grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, flags);
+grpcsharp_call_start_server_streaming(grpc_call *call,
+ callback_funcptr callback,
+ const char *send_buffer,
+ size_t send_buffer_len) {
+ /* TODO: don't use magic number */
+ grpc_op ops[5];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
+
+ /* TODO: implement sending the metadata... */
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ /* ctx->send_initial_metadata is already zeroed out. */
+ ops[0].data.send_initial_metadata.count = 0;
+ ops[0].data.send_initial_metadata.metadata = NULL;
+
+ ops[1].op = GRPC_OP_SEND_MESSAGE;
+ ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
+ ops[1].data.send_message = ctx->send_message;
+
+ ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+
+ ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
+ ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+
+ ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[4].data.recv_status_on_client.trailing_metadata =
+ &(ctx->recv_status_on_client.trailing_metadata);
+ ops[4].data.recv_status_on_client.status =
+ &(ctx->recv_status_on_client.status);
+ /* not using preallocation for status_details */
+ ops[4].data.recv_status_on_client.status_details =
+ &(ctx->recv_status_on_client.status_details);
+ ops[4].data.recv_status_on_client.status_details_capacity =
+ &(ctx->recv_status_on_client.status_details_capacity);
+
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_server_accept_old(grpc_call *call, grpc_completion_queue *cq,
- void *finished_tag) {
- return grpc_call_server_accept_old(call, cq, finished_tag);
+grpcsharp_call_start_duplex_streaming(grpc_call *call,
+ callback_funcptr callback) {
+ /* TODO: don't use magic number */
+ grpc_op ops[3];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
+
+ /* TODO: implement sending the metadata... */
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ /* ctx->send_initial_metadata is already zeroed out. */
+ ops[0].data.send_initial_metadata.count = 0;
+ ops[0].data.send_initial_metadata.metadata = NULL;
+
+ ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
+ ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+
+ ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[2].data.recv_status_on_client.trailing_metadata =
+ &(ctx->recv_status_on_client.trailing_metadata);
+ ops[2].data.recv_status_on_client.status =
+ &(ctx->recv_status_on_client.status);
+ /* not using preallocation for status_details */
+ ops[2].data.recv_status_on_client.status_details =
+ &(ctx->recv_status_on_client.status_details);
+ ops[2].data.recv_status_on_client.status_details_capacity =
+ &(ctx->recv_status_on_client.status_details_capacity);
+
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_server_end_initial_metadata_old(grpc_call *call,
- gpr_uint32 flags) {
- return grpc_call_server_end_initial_metadata_old(call, flags);
-}
+grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
+ const char *send_buffer, size_t send_buffer_len) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
-GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) {
- return grpc_call_cancel(call);
-}
+ ops[0].op = GRPC_OP_SEND_MESSAGE;
+ ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
+ ops[0].data.send_message = ctx->send_message;
-GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status,
- const char *description) {
- return grpc_call_cancel_with_status(call, status, description);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_start_write_old(grpc_call *call, grpc_byte_buffer *byte_buffer,
- void *tag, gpr_uint32 flags) {
- return grpc_call_start_write_old(call, byte_buffer, tag, flags);
+grpcsharp_call_send_close_from_client(grpc_call *call,
+ callback_funcptr callback) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
+
+ ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_start_write_status_old(grpc_call *call,
- grpc_status_code status_code,
- const char *status_message, void *tag) {
- return grpc_call_start_write_status_old(call, status_code, status_message,
- tag);
+grpcsharp_call_send_status_from_server(grpc_call *call,
+ callback_funcptr callback,
+ grpc_status_code status_code,
+ const char *status_details) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
+
+ ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ ops[0].data.send_status_from_server.status = status_code;
+ ops[0].data.send_status_from_server.status_details =
+ gpr_strdup(status_details);
+ ops[0].data.send_status_from_server.trailing_metadata = NULL;
+ ops[0].data.send_status_from_server.trailing_metadata_count = 0;
+
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_writes_done_old(grpc_call *call, void *tag) {
- return grpc_call_writes_done_old(call, tag);
+grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
+
+ ops[0].op = GRPC_OP_RECV_MESSAGE;
+ ops[0].data.recv_message = &(ctx->recv_message);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_start_read_old(grpc_call *call, void *tag) {
- return grpc_call_start_read_old(call, tag);
-}
+grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
+ /* TODO: don't use magic number */
+ grpc_op ops[2];
-GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
- grpc_call_destroy(call);
-}
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
-GPR_EXPORT void GPR_CALLTYPE
-grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
- const char *buffer, size_t len,
- void *tag, gpr_uint32 flags) {
- grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len);
- GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) ==
- GRPC_CALL_OK);
- grpc_byte_buffer_destroy(byte_buffer);
-}
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[0].data.send_initial_metadata.count = 0;
+ ops[0].data.send_initial_metadata.metadata = NULL;
-/* Server */
+ ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops[1].data.recv_close_on_server.cancelled =
+ (&ctx->recv_close_on_server_cancelled);
-GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_server_request_call_old(grpc_server *server, void *tag_new) {
- return grpc_server_request_call_old(server, tag_new);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
+/* Server */
+
GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) {
return grpc_server_create(cq, args);
}
-GPR_EXPORT int GPR_CALLTYPE
+GPR_EXPORT gpr_int32 GPR_CALLTYPE
grpcsharp_server_add_http2_port(grpc_server *server, const char *addr) {
return grpc_server_add_http2_port(server, addr);
}
-GPR_EXPORT int GPR_CALLTYPE
-grpcsharp_server_add_secure_http2_port(grpc_server *server, const char *addr) {
- return grpc_server_add_secure_http2_port(server, addr);
-}
-
GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) {
grpc_server_start(server);
}
@@ -343,3 +554,14 @@ grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) {
GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
grpc_server_destroy(server);
}
+
+GPR_EXPORT grpc_call_error GPR_CALLTYPE
+grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
+ callback_funcptr callback) {
+ grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
+ ctx->callback = callback;
+
+ return grpc_server_request_call(
+ server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
+ &(ctx->server_rpc_new.request_metadata), cq, ctx);
+}