From 5344c81f3c7701a6b1864faa2da91041c1e29e03 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 16 Aug 2018 17:19:27 +0200 Subject: switch C# to contextual serializer and deserializer internally --- .../Internal/AsyncCallServerTest.cs | 2 +- src/csharp/Grpc.Core.Tests/MarshallerTest.cs | 7 +-- src/csharp/Grpc.Core/DeserializationContext.cs | 7 ++- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 2 +- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 27 +++++++-- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 2 +- .../Internal/DefaultDeserializationContext.cs | 66 +++++++++++++++++++++ .../Internal/DefaultSerializationContext.cs | 62 ++++++++++++++++++++ src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 16 +++--- src/csharp/Grpc.Core/Marshaller.cs | 67 ++-------------------- src/csharp/Grpc.Core/SerializationContext.cs | 7 ++- 11 files changed, 181 insertions(+), 84 deletions(-) create mode 100644 src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs create mode 100644 src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs (limited to 'src/csharp') diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index e7d8939978..5c7d48f786 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -49,7 +49,7 @@ namespace Grpc.Core.Internal.Tests fakeCall = new FakeNativeCall(); asyncCallServer = new AsyncCallServer( - Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer, + Marshallers.StringMarshaller.ContextualSerializer, Marshallers.StringMarshaller.ContextualDeserializer, server); asyncCallServer.InitializeForTesting(fakeCall); } diff --git a/src/csharp/Grpc.Core.Tests/MarshallerTest.cs b/src/csharp/Grpc.Core.Tests/MarshallerTest.cs index 97f64a0575..ad3e81d61f 100644 --- a/src/csharp/Grpc.Core.Tests/MarshallerTest.cs +++ b/src/csharp/Grpc.Core.Tests/MarshallerTest.cs @@ -69,11 +69,8 @@ namespace Grpc.Core.Tests Assert.AreSame(contextualSerializer, marshaller.ContextualSerializer); Assert.AreSame(contextualDeserializer, marshaller.ContextualDeserializer); - - // test that emulated serializer and deserializer work - var origMsg = "abc"; - var serialized = marshaller.Serializer(origMsg); - Assert.AreEqual(origMsg, marshaller.Deserializer(serialized)); + Assert.Throws(typeof(NotImplementedException), () => marshaller.Serializer("abc")); + Assert.Throws(typeof(NotImplementedException), () => marshaller.Deserializer(new byte[] {1, 2, 3})); } class FakeSerializationContext : SerializationContext diff --git a/src/csharp/Grpc.Core/DeserializationContext.cs b/src/csharp/Grpc.Core/DeserializationContext.cs index 5b6372ef85..d69e0db5bd 100644 --- a/src/csharp/Grpc.Core/DeserializationContext.cs +++ b/src/csharp/Grpc.Core/DeserializationContext.cs @@ -16,6 +16,8 @@ #endregion +using System; + namespace Grpc.Core { /// @@ -41,6 +43,9 @@ namespace Grpc.Core /// (as there is no practical reason for doing so) and DeserializationContext implementations are free to assume so. /// /// byte array containing the entire payload. - public abstract byte[] PayloadAsNewBuffer(); + public virtual byte[] PayloadAsNewBuffer() + { + throw new NotImplementedException(); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 4cdf0ee6a7..b6d687f71e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -54,7 +54,7 @@ namespace Grpc.Core.Internal ClientSideStatus? finishedStatus; public AsyncCall(CallInvocationDetails callDetails) - : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) + : base(callDetails.RequestMarshaller.ContextualSerializer, callDetails.ResponseMarshaller.ContextualDeserializer) { this.details = callDetails.WithOptions(callDetails.Options.Normalize()); this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index a93dc34620..39c9f7c616 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -40,8 +40,8 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType>(); protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); - readonly Func serializer; - readonly Func deserializer; + readonly Action serializer; + readonly Func deserializer; protected readonly object myLock = new object(); @@ -63,7 +63,7 @@ namespace Grpc.Core.Internal protected bool initialMetadataSent; protected long streamingWritesCounter; // Number of streaming send operations started so far. - public AsyncCallBase(Func serializer, Func deserializer) + public AsyncCallBase(Action serializer, Func deserializer) { this.serializer = GrpcPreconditions.CheckNotNull(serializer); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); @@ -215,14 +215,26 @@ namespace Grpc.Core.Internal protected byte[] UnsafeSerialize(TWrite msg) { - return serializer(msg); + DefaultSerializationContext context = null; + try + { + context = DefaultSerializationContext.GetInitializedThreadLocal(); + serializer(msg, context); + return context.GetPayload(); + } + finally + { + context?.Reset(); + } } protected Exception TryDeserialize(byte[] payload, out TRead msg) { + DefaultDeserializationContext context = null; try { - msg = deserializer(payload); + context = DefaultDeserializationContext.GetInitializedThreadLocal(payload); + msg = deserializer(context); return null; } catch (Exception e) @@ -230,6 +242,11 @@ namespace Grpc.Core.Internal msg = default(TRead); return e; } + finally + { + context?.Reset(); + + } } /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 0ceca4abb8..0bf1fb3b7d 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -37,7 +37,7 @@ namespace Grpc.Core.Internal readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly Server server; - public AsyncCallServer(Func serializer, Func deserializer, Server server) : base(serializer, deserializer) + public AsyncCallServer(Action serializer, Func deserializer, Server server) : base(serializer, deserializer) { this.server = GrpcPreconditions.CheckNotNull(server); } diff --git a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs new file mode 100644 index 0000000000..7ace80e8d5 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs @@ -0,0 +1,66 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using Grpc.Core.Utils; +using System; +using System.Threading; + +namespace Grpc.Core.Internal +{ + internal class DefaultDeserializationContext : DeserializationContext + { + static readonly ThreadLocal threadLocalInstance = + new ThreadLocal(() => new DefaultDeserializationContext(), false); + + byte[] payload; + bool alreadyCalledPayloadAsNewBuffer; + + public DefaultDeserializationContext() + { + Reset(); + } + + public override int PayloadLength => payload.Length; + + public override byte[] PayloadAsNewBuffer() + { + GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer); + alreadyCalledPayloadAsNewBuffer = true; + return payload; + } + + public void Initialize(byte[] payload) + { + this.payload = GrpcPreconditions.CheckNotNull(payload); + this.alreadyCalledPayloadAsNewBuffer = false; + } + + public void Reset() + { + this.payload = null; + this.alreadyCalledPayloadAsNewBuffer = true; // mark payload as read + } + + public static DefaultDeserializationContext GetInitializedThreadLocal(byte[] payload) + { + var instance = threadLocalInstance.Value; + instance.Initialize(payload); + return instance; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs new file mode 100644 index 0000000000..cceb194879 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs @@ -0,0 +1,62 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using Grpc.Core.Utils; +using System.Threading; + +namespace Grpc.Core.Internal +{ + internal class DefaultSerializationContext : SerializationContext + { + static readonly ThreadLocal threadLocalInstance = + new ThreadLocal(() => new DefaultSerializationContext(), false); + + bool isComplete; + byte[] payload; + + public DefaultSerializationContext() + { + Reset(); + } + + public override void Complete(byte[] payload) + { + GrpcPreconditions.CheckState(!isComplete); + this.isComplete = true; + this.payload = payload; + } + + internal byte[] GetPayload() + { + return this.payload; + } + + public void Reset() + { + this.isComplete = false; + this.payload = null; + } + + public static DefaultSerializationContext GetInitializedThreadLocal() + { + var instance = threadLocalInstance.Value; + instance.Reset(); + return instance; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 81522cf8fe..ec732e8c7f 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -52,8 +52,8 @@ namespace Grpc.Core.Internal public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( - method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer, + method.ResponseMarshaller.ContextualSerializer, + method.RequestMarshaller.ContextualDeserializer, newRpc.Server); asyncCall.Initialize(newRpc.Call, cq); @@ -116,8 +116,8 @@ namespace Grpc.Core.Internal public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( - method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer, + method.ResponseMarshaller.ContextualSerializer, + method.RequestMarshaller.ContextualDeserializer, newRpc.Server); asyncCall.Initialize(newRpc.Call, cq); @@ -179,8 +179,8 @@ namespace Grpc.Core.Internal public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( - method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer, + method.ResponseMarshaller.ContextualSerializer, + method.RequestMarshaller.ContextualDeserializer, newRpc.Server); asyncCall.Initialize(newRpc.Call, cq); @@ -242,8 +242,8 @@ namespace Grpc.Core.Internal public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( - method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer, + method.ResponseMarshaller.ContextualSerializer, + method.RequestMarshaller.ContextualDeserializer, newRpc.Server); asyncCall.Initialize(newRpc.Call, cq); diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs index 0af9aa586b..34a1849cd7 100644 --- a/src/csharp/Grpc.Core/Marshaller.cs +++ b/src/csharp/Grpc.Core/Marshaller.cs @@ -41,6 +41,8 @@ namespace Grpc.Core { this.serializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer)); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer)); + // contextual serialization/deserialization is emulated to make the marshaller + // usable with the grpc library (required for backward compatibility). this.contextualSerializer = EmulateContextualSerializer; this.contextualDeserializer = EmulateContextualDeserializer; } @@ -57,10 +59,10 @@ namespace Grpc.Core { this.contextualSerializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer)); this.contextualDeserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer)); - // TODO(jtattermusch): once gRPC C# library switches to using contextual (de)serializer, - // emulating the simple (de)serializer will become unnecessary. - this.serializer = EmulateSimpleSerializer; - this.deserializer = EmulateSimpleDeserializer; + // gRPC only uses contextual serializer/deserializer internally, so emulating the legacy + // (de)serializer is not necessary. + this.serializer = (msg) => { throw new NotImplementedException(); }; + this.deserializer = (payload) => { throw new NotImplementedException(); }; } /// @@ -85,25 +87,6 @@ namespace Grpc.Core /// public Func ContextualDeserializer => this.contextualDeserializer; - // for backward compatibility, emulate the simple serializer using the contextual one - private byte[] EmulateSimpleSerializer(T msg) - { - // TODO(jtattermusch): avoid the allocation by passing a thread-local instance - // This code will become unnecessary once gRPC C# library switches to using contextual (de)serializer. - var context = new EmulatedSerializationContext(); - this.contextualSerializer(msg, context); - return context.GetPayload(); - } - - // for backward compatibility, emulate the simple deserializer using the contextual one - private T EmulateSimpleDeserializer(byte[] payload) - { - // TODO(jtattermusch): avoid the allocation by passing a thread-local instance - // This code will become unnecessary once gRPC C# library switches to using contextual (de)serializer. - var context = new EmulatedDeserializationContext(payload); - return this.contextualDeserializer(context); - } - // for backward compatibility, emulate the contextual serializer using the simple one private void EmulateContextualSerializer(T message, SerializationContext context) { @@ -116,44 +99,6 @@ namespace Grpc.Core { return this.deserializer(context.PayloadAsNewBuffer()); } - - internal class EmulatedSerializationContext : SerializationContext - { - bool isComplete; - byte[] payload; - - public override void Complete(byte[] payload) - { - GrpcPreconditions.CheckState(!isComplete); - this.isComplete = true; - this.payload = payload; - } - - internal byte[] GetPayload() - { - return this.payload; - } - } - - internal class EmulatedDeserializationContext : DeserializationContext - { - readonly byte[] payload; - bool alreadyCalledPayloadAsNewBuffer; - - public EmulatedDeserializationContext(byte[] payload) - { - this.payload = GrpcPreconditions.CheckNotNull(payload); - } - - public override int PayloadLength => payload.Length; - - public override byte[] PayloadAsNewBuffer() - { - GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer); - alreadyCalledPayloadAsNewBuffer = true; - return payload; - } - } } /// diff --git a/src/csharp/Grpc.Core/SerializationContext.cs b/src/csharp/Grpc.Core/SerializationContext.cs index cf4d1595da..9aef2adbcd 100644 --- a/src/csharp/Grpc.Core/SerializationContext.cs +++ b/src/csharp/Grpc.Core/SerializationContext.cs @@ -16,6 +16,8 @@ #endregion +using System; + namespace Grpc.Core { /// @@ -29,6 +31,9 @@ namespace Grpc.Core /// payload which must not be accessed afterwards. /// /// the serialized form of current message - public abstract void Complete(byte[] payload); + public virtual void Complete(byte[] payload) + { + throw new NotImplementedException(); + } } } -- cgit v1.2.3