aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2018-11-15 09:06:47 +0100
committerGravatar GitHub <noreply@github.com>2018-11-15 09:06:47 +0100
commitdc1bbf9932b8de85e96c92db9c7f43219498b58c (patch)
tree3d7056e6e5024d4f7cdedbd5cb965d0171849bf4 /src
parentcba17be464bdae66337f218de83ebf45b3858d82 (diff)
parent5344c81f3c7701a6b1864faa2da91041c1e29e03 (diff)
Merge pull request #17167 from jtattermusch/switch_to_contextual_serializer
Switch C# to contextual serializer and deserializer internally
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/MarshallerTest.cs7
-rw-r--r--src/csharp/Grpc.Core/DeserializationContext.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs27
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs66
-rw-r--r--src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs62
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs16
-rw-r--r--src/csharp/Grpc.Core/Marshaller.cs67
-rw-r--r--src/csharp/Grpc.Core/SerializationContext.cs7
11 files changed, 181 insertions, 84 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index e7d8939978..5c7d48f786 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -49,7 +49,7 @@ namespace Grpc.Core.Internal.Tests
fakeCall = new FakeNativeCall();
asyncCallServer = new AsyncCallServer<string, string>(
- Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
+ Marshallers.StringMarshaller.ContextualSerializer, Marshallers.StringMarshaller.ContextualDeserializer,
server);
asyncCallServer.InitializeForTesting(fakeCall);
}
diff --git a/src/csharp/Grpc.Core.Tests/MarshallerTest.cs b/src/csharp/Grpc.Core.Tests/MarshallerTest.cs
index 97f64a0575..ad3e81d61f 100644
--- a/src/csharp/Grpc.Core.Tests/MarshallerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/MarshallerTest.cs
@@ -69,11 +69,8 @@ namespace Grpc.Core.Tests
Assert.AreSame(contextualSerializer, marshaller.ContextualSerializer);
Assert.AreSame(contextualDeserializer, marshaller.ContextualDeserializer);
-
- // test that emulated serializer and deserializer work
- var origMsg = "abc";
- var serialized = marshaller.Serializer(origMsg);
- Assert.AreEqual(origMsg, marshaller.Deserializer(serialized));
+ Assert.Throws(typeof(NotImplementedException), () => marshaller.Serializer("abc"));
+ Assert.Throws(typeof(NotImplementedException), () => marshaller.Deserializer(new byte[] {1, 2, 3}));
}
class FakeSerializationContext : SerializationContext
diff --git a/src/csharp/Grpc.Core/DeserializationContext.cs b/src/csharp/Grpc.Core/DeserializationContext.cs
index 5b6372ef85..d69e0db5bd 100644
--- a/src/csharp/Grpc.Core/DeserializationContext.cs
+++ b/src/csharp/Grpc.Core/DeserializationContext.cs
@@ -16,6 +16,8 @@
#endregion
+using System;
+
namespace Grpc.Core
{
/// <summary>
@@ -41,6 +43,9 @@ namespace Grpc.Core
/// (as there is no practical reason for doing so) and <c>DeserializationContext</c> implementations are free to assume so.
/// </summary>
/// <returns>byte array containing the entire payload.</returns>
- public abstract byte[] PayloadAsNewBuffer();
+ public virtual byte[] PayloadAsNewBuffer()
+ {
+ throw new NotImplementedException();
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 4cdf0ee6a7..b6d687f71e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -54,7 +54,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
- : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
+ : base(callDetails.RequestMarshaller.ContextualSerializer, callDetails.ResponseMarshaller.ContextualDeserializer)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index a93dc34620..39c9f7c616 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -40,8 +40,8 @@ namespace Grpc.Core.Internal
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
- readonly Func<TWrite, byte[]> serializer;
- readonly Func<byte[], TRead> deserializer;
+ readonly Action<TWrite, SerializationContext> serializer;
+ readonly Func<DeserializationContext, TRead> deserializer;
protected readonly object myLock = new object();
@@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
protected bool initialMetadataSent;
protected long streamingWritesCounter; // Number of streaming send operations started so far.
- public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ public AsyncCallBase(Action<TWrite, SerializationContext> serializer, Func<DeserializationContext, TRead> deserializer)
{
this.serializer = GrpcPreconditions.CheckNotNull(serializer);
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
@@ -215,14 +215,26 @@ namespace Grpc.Core.Internal
protected byte[] UnsafeSerialize(TWrite msg)
{
- return serializer(msg);
+ DefaultSerializationContext context = null;
+ try
+ {
+ context = DefaultSerializationContext.GetInitializedThreadLocal();
+ serializer(msg, context);
+ return context.GetPayload();
+ }
+ finally
+ {
+ context?.Reset();
+ }
}
protected Exception TryDeserialize(byte[] payload, out TRead msg)
{
+ DefaultDeserializationContext context = null;
try
{
- msg = deserializer(payload);
+ context = DefaultDeserializationContext.GetInitializedThreadLocal(payload);
+ msg = deserializer(context);
return null;
}
catch (Exception e)
@@ -230,6 +242,11 @@ namespace Grpc.Core.Internal
msg = default(TRead);
return e;
}
+ finally
+ {
+ context?.Reset();
+
+ }
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 0ceca4abb8..0bf1fb3b7d 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -37,7 +37,7 @@ namespace Grpc.Core.Internal
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
+ public AsyncCallServer(Action<TResponse, SerializationContext> serializer, Func<DeserializationContext, TRequest> deserializer, Server server) : base(serializer, deserializer)
{
this.server = GrpcPreconditions.CheckNotNull(server);
}
diff --git a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
new file mode 100644
index 0000000000..7ace80e8d5
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
@@ -0,0 +1,66 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using Grpc.Core.Utils;
+using System;
+using System.Threading;
+
+namespace Grpc.Core.Internal
+{
+ internal class DefaultDeserializationContext : DeserializationContext
+ {
+ static readonly ThreadLocal<DefaultDeserializationContext> threadLocalInstance =
+ new ThreadLocal<DefaultDeserializationContext>(() => new DefaultDeserializationContext(), false);
+
+ byte[] payload;
+ bool alreadyCalledPayloadAsNewBuffer;
+
+ public DefaultDeserializationContext()
+ {
+ Reset();
+ }
+
+ public override int PayloadLength => payload.Length;
+
+ public override byte[] PayloadAsNewBuffer()
+ {
+ GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer);
+ alreadyCalledPayloadAsNewBuffer = true;
+ return payload;
+ }
+
+ public void Initialize(byte[] payload)
+ {
+ this.payload = GrpcPreconditions.CheckNotNull(payload);
+ this.alreadyCalledPayloadAsNewBuffer = false;
+ }
+
+ public void Reset()
+ {
+ this.payload = null;
+ this.alreadyCalledPayloadAsNewBuffer = true; // mark payload as read
+ }
+
+ public static DefaultDeserializationContext GetInitializedThreadLocal(byte[] payload)
+ {
+ var instance = threadLocalInstance.Value;
+ instance.Initialize(payload);
+ return instance;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs
new file mode 100644
index 0000000000..cceb194879
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs
@@ -0,0 +1,62 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using Grpc.Core.Utils;
+using System.Threading;
+
+namespace Grpc.Core.Internal
+{
+ internal class DefaultSerializationContext : SerializationContext
+ {
+ static readonly ThreadLocal<DefaultSerializationContext> threadLocalInstance =
+ new ThreadLocal<DefaultSerializationContext>(() => new DefaultSerializationContext(), false);
+
+ bool isComplete;
+ byte[] payload;
+
+ public DefaultSerializationContext()
+ {
+ Reset();
+ }
+
+ public override void Complete(byte[] payload)
+ {
+ GrpcPreconditions.CheckState(!isComplete);
+ this.isComplete = true;
+ this.payload = payload;
+ }
+
+ internal byte[] GetPayload()
+ {
+ return this.payload;
+ }
+
+ public void Reset()
+ {
+ this.isComplete = false;
+ this.payload = null;
+ }
+
+ public static DefaultSerializationContext GetInitializedThreadLocal()
+ {
+ var instance = threadLocalInstance.Value;
+ instance.Reset();
+ return instance;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 81522cf8fe..ec732e8c7f 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -52,8 +52,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
- method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer,
+ method.ResponseMarshaller.ContextualSerializer,
+ method.RequestMarshaller.ContextualDeserializer,
newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq);
@@ -116,8 +116,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
- method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer,
+ method.ResponseMarshaller.ContextualSerializer,
+ method.RequestMarshaller.ContextualDeserializer,
newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq);
@@ -179,8 +179,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
- method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer,
+ method.ResponseMarshaller.ContextualSerializer,
+ method.RequestMarshaller.ContextualDeserializer,
newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq);
@@ -242,8 +242,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
- method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer,
+ method.ResponseMarshaller.ContextualSerializer,
+ method.RequestMarshaller.ContextualDeserializer,
newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq);
diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs
index 0af9aa586b..34a1849cd7 100644
--- a/src/csharp/Grpc.Core/Marshaller.cs
+++ b/src/csharp/Grpc.Core/Marshaller.cs
@@ -41,6 +41,8 @@ namespace Grpc.Core
{
this.serializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer));
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer));
+ // contextual serialization/deserialization is emulated to make the marshaller
+ // usable with the grpc library (required for backward compatibility).
this.contextualSerializer = EmulateContextualSerializer;
this.contextualDeserializer = EmulateContextualDeserializer;
}
@@ -57,10 +59,10 @@ namespace Grpc.Core
{
this.contextualSerializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer));
this.contextualDeserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer));
- // TODO(jtattermusch): once gRPC C# library switches to using contextual (de)serializer,
- // emulating the simple (de)serializer will become unnecessary.
- this.serializer = EmulateSimpleSerializer;
- this.deserializer = EmulateSimpleDeserializer;
+ // gRPC only uses contextual serializer/deserializer internally, so emulating the legacy
+ // (de)serializer is not necessary.
+ this.serializer = (msg) => { throw new NotImplementedException(); };
+ this.deserializer = (payload) => { throw new NotImplementedException(); };
}
/// <summary>
@@ -85,25 +87,6 @@ namespace Grpc.Core
/// </summary>
public Func<DeserializationContext, T> ContextualDeserializer => this.contextualDeserializer;
- // for backward compatibility, emulate the simple serializer using the contextual one
- private byte[] EmulateSimpleSerializer(T msg)
- {
- // TODO(jtattermusch): avoid the allocation by passing a thread-local instance
- // This code will become unnecessary once gRPC C# library switches to using contextual (de)serializer.
- var context = new EmulatedSerializationContext();
- this.contextualSerializer(msg, context);
- return context.GetPayload();
- }
-
- // for backward compatibility, emulate the simple deserializer using the contextual one
- private T EmulateSimpleDeserializer(byte[] payload)
- {
- // TODO(jtattermusch): avoid the allocation by passing a thread-local instance
- // This code will become unnecessary once gRPC C# library switches to using contextual (de)serializer.
- var context = new EmulatedDeserializationContext(payload);
- return this.contextualDeserializer(context);
- }
-
// for backward compatibility, emulate the contextual serializer using the simple one
private void EmulateContextualSerializer(T message, SerializationContext context)
{
@@ -116,44 +99,6 @@ namespace Grpc.Core
{
return this.deserializer(context.PayloadAsNewBuffer());
}
-
- internal class EmulatedSerializationContext : SerializationContext
- {
- bool isComplete;
- byte[] payload;
-
- public override void Complete(byte[] payload)
- {
- GrpcPreconditions.CheckState(!isComplete);
- this.isComplete = true;
- this.payload = payload;
- }
-
- internal byte[] GetPayload()
- {
- return this.payload;
- }
- }
-
- internal class EmulatedDeserializationContext : DeserializationContext
- {
- readonly byte[] payload;
- bool alreadyCalledPayloadAsNewBuffer;
-
- public EmulatedDeserializationContext(byte[] payload)
- {
- this.payload = GrpcPreconditions.CheckNotNull(payload);
- }
-
- public override int PayloadLength => payload.Length;
-
- public override byte[] PayloadAsNewBuffer()
- {
- GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer);
- alreadyCalledPayloadAsNewBuffer = true;
- return payload;
- }
- }
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/SerializationContext.cs b/src/csharp/Grpc.Core/SerializationContext.cs
index cf4d1595da..9aef2adbcd 100644
--- a/src/csharp/Grpc.Core/SerializationContext.cs
+++ b/src/csharp/Grpc.Core/SerializationContext.cs
@@ -16,6 +16,8 @@
#endregion
+using System;
+
namespace Grpc.Core
{
/// <summary>
@@ -29,6 +31,9 @@ namespace Grpc.Core
/// payload which must not be accessed afterwards.
/// </summary>
/// <param name="payload">the serialized form of current message</param>
- public abstract void Complete(byte[] payload);
+ public virtual void Complete(byte[] payload)
+ {
+ throw new NotImplementedException();
+ }
}
}