diff options
author | murgatroid99 <mlumish@google.com> | 2015-08-28 11:42:10 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2015-08-28 11:42:10 -0700 |
commit | 0d1625a6b47e3bd6af995d9a8c68fcc26ef9017b (patch) | |
tree | 25c249d6412edca5dc67bc3ef56f8bfb5ecc3207 /src | |
parent | 7d58abae60a829c5c763fbd5bf8aca85287f9d05 (diff) | |
parent | 956e411e31c97836702aac5675e9f509b2231426 (diff) |
Merge branch 'master' into node_error_code_compliance
Diffstat (limited to 'src')
28 files changed, 382 insertions, 110 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index fbedb35661..b78445595c 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -36,12 +36,12 @@ #include <stdio.h> #include <string.h> -#include "include/grpc/census.h" #include "src/core/census/rpc_stat_id.h" #include "src/core/channel/channel_stack.h" #include "src/core/channel/noop_filter.h" #include "src/core/statistics/census_interface.h" #include "src/core/statistics/census_rpc_stats.h" +#include <grpc/census.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 38c6052f9c..781db7b0d6 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -177,8 +177,10 @@ int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE); - *stream_global = &stream->global; - *stream_writing = &stream->writing; + if (r != 0) { + *stream_global = &stream->global; + *stream_writing = &stream->writing; + } return r; } @@ -210,7 +212,9 @@ int grpc_chttp2_list_pop_writing_stream( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITING); - *stream_writing = &stream->writing; + if (r != 0) { + *stream_writing = &stream->writing; + } return r; } @@ -230,8 +234,10 @@ int grpc_chttp2_list_pop_written_stream( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITTEN); - *stream_global = &stream->global; - *stream_writing = &stream->writing; + if (r != 0) { + *stream_global = &stream->global; + *stream_writing = &stream->writing; + } return r; } @@ -251,8 +257,10 @@ int grpc_chttp2_list_pop_parsing_seen_stream( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream, GRPC_CHTTP2_LIST_PARSING_SEEN); - *stream_global = &stream->global; - *stream_parsing = &stream->parsing; + if (r != 0) { + *stream_global = &stream->global; + *stream_parsing = &stream->parsing; + } return r; } @@ -270,7 +278,9 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); - *stream_global = &stream->global; + if (r != 0) { + *stream_global = &stream->global; + } return r; } @@ -288,7 +298,9 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); - *stream_global = &stream->global; + if (r != 0) { + *stream_global = &stream->global; + } return r; } @@ -306,7 +318,9 @@ int grpc_chttp2_list_pop_cancelled_waiting_for_writing( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING); - *stream_global = &stream->global; + if (r != 0) { + *stream_global = &stream->global; + } return r; } @@ -326,8 +340,10 @@ int grpc_chttp2_list_pop_incoming_window_updated( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); - *stream_global = &stream->global; - *stream_parsing = &stream->parsing; + if (r != 0) { + *stream_global = &stream->global; + *stream_parsing = &stream->parsing; + } return r; } @@ -353,7 +369,9 @@ int grpc_chttp2_list_pop_read_write_state_changed( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); - *stream_global = &stream->global; + if (r != 0) { + *stream_global = &stream->global; + } return r; } diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 3fd21a2f5d..61638764a6 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -703,7 +703,7 @@ int grpc_mdstr_is_legal_header(grpc_mdstr *s) { int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s) { static const gpr_uint8 legal_header_bits[256 / 8] = { - 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0x00, 0x00, 0x00, 0x00, 0xff, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; return conforms_to(s, legal_header_bits); diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc index 8c571cbbaa..1dac960017 100644 --- a/src/cpp/client/create_channel.cc +++ b/src/cpp/client/create_channel.cc @@ -44,6 +44,11 @@ namespace grpc { class ChannelArguments; std::shared_ptr<Channel> CreateChannel( + const grpc::string& target, const std::shared_ptr<Credentials>& creds) { + return CreateCustomChannel(target, creds, ChannelArguments()); +} + +std::shared_ptr<Channel> CreateCustomChannel( const grpc::string& target, const std::shared_ptr<Credentials>& creds, const ChannelArguments& args) { ChannelArguments cp_args = args; @@ -57,4 +62,5 @@ std::shared_ptr<Channel> CreateChannel( NULL, GRPC_STATUS_INVALID_ARGUMENT, "Invalid credentials.")); } + } // namespace grpc diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index b571fe9025..f730936062 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -64,6 +64,7 @@ <Link>Version.cs</Link> </Compile> <Compile Include="ClientBaseTest.cs" /> + <Compile Include="MarshallingErrorsTest.cs" /> <Compile Include="ShutdownTest.cs" /> <Compile Include="Internal\AsyncCallTest.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs new file mode 100644 index 0000000000..83707e0c6d --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs @@ -0,0 +1,176 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class MarshallingErrorsTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + var marshaller = new Marshaller<string>( + (str) => + { + if (str == "UNSERIALIZABLE_VALUE") + { + // Google.Protobuf throws exception inherited from IOException + throw new IOException("Error serializing the message."); + } + return System.Text.Encoding.UTF8.GetBytes(str); + }, + (payload) => + { + var s = System.Text.Encoding.UTF8.GetString(payload); + if (s == "UNPARSEABLE_VALUE") + { + // Google.Protobuf throws exception inherited from IOException + throw new IOException("Error parsing the message."); + } + return s; + }); + helper = new MockServiceHelper(Host, marshaller); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public void ResponseParsingError_UnaryResponse() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => + { + return Task.FromResult("UNPARSEABLE_VALUE"); + }); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "REQUEST")); + Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); + } + + [Test] + public void ResponseParsingError_StreamingResponse() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + await responseStream.WriteAsync("UNPARSEABLE_VALUE"); + await Task.Delay(10000); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "REQUEST"); + var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); + } + + [Test] + public void RequestParsingError_UnaryRequest() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => + { + return Task.FromResult("RESPONSE"); + }); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNPARSEABLE_VALUE")); + // Spec doesn't define the behavior. With the current implementation server handler throws exception which results in StatusCode.Unknown. + Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode); + } + + [Test] + public async Task RequestParsingError_StreamingRequest() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + Assert.Throws<IOException>(async () => await requestStream.MoveNext()); + return "RESPONSE"; + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + await call.RequestStream.WriteAsync("UNPARSEABLE_VALUE"); + + Assert.AreEqual("RESPONSE", await call); + } + + [Test] + public void RequestSerializationError_BlockingUnary() + { + Assert.Throws<IOException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE")); + } + + [Test] + public void RequestSerializationError_AsyncUnary() + { + Assert.Throws<IOException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE")); + } + + [Test] + public async Task RequestSerializationError_ClientStreaming() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + CollectionAssert.AreEqual(new [] {"A", "B"}, await requestStream.ToListAsync()); + return "RESPONSE"; + }); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + await call.RequestStream.WriteAsync("A"); + Assert.Throws<IOException>(async () => await call.RequestStream.WriteAsync("UNSERIALIZABLE_VALUE")); + await call.RequestStream.WriteAsync("B"); + await call.RequestStream.CompleteAsync(); + + Assert.AreEqual("RESPONSE", await call); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/MetadataTest.cs b/src/csharp/Grpc.Core.Tests/MetadataTest.cs index c00f945d6a..ddeb7d0926 100644 --- a/src/csharp/Grpc.Core.Tests/MetadataTest.cs +++ b/src/csharp/Grpc.Core.Tests/MetadataTest.cs @@ -75,6 +75,17 @@ namespace Grpc.Core.Tests } [Test] + public void AsciiEntry_KeyValidity() + { + new Metadata.Entry("ABC", "XYZ"); + new Metadata.Entry("0123456789abc", "XYZ"); + new Metadata.Entry("-abc", "XYZ"); + new Metadata.Entry("a_bc_", "XYZ"); + Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc[", "xyz")); + Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc/", "xyz")); + } + + [Test] public void Entry_ConstructionPreconditions() { Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry(null, "xyz")); diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs index bb69648d8b..765732c768 100644 --- a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs +++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs @@ -50,37 +50,14 @@ namespace Grpc.Core.Tests { public const string ServiceName = "tests.Test"; - public static readonly Method<string, string> UnaryMethod = new Method<string, string>( - MethodType.Unary, - ServiceName, - "Unary", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - - public static readonly Method<string, string> ClientStreamingMethod = new Method<string, string>( - MethodType.ClientStreaming, - ServiceName, - "ClientStreaming", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - - public static readonly Method<string, string> ServerStreamingMethod = new Method<string, string>( - MethodType.ServerStreaming, - ServiceName, - "ServerStreaming", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - - public static readonly Method<string, string> DuplexStreamingMethod = new Method<string, string>( - MethodType.DuplexStreaming, - ServiceName, - "DuplexStreaming", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - readonly string host; readonly ServerServiceDefinition serviceDefinition; + readonly Method<string, string> unaryMethod; + readonly Method<string, string> clientStreamingMethod; + readonly Method<string, string> serverStreamingMethod; + readonly Method<string, string> duplexStreamingMethod; + UnaryServerMethod<string, string> unaryHandler; ClientStreamingServerMethod<string, string> clientStreamingHandler; ServerStreamingServerMethod<string, string> serverStreamingHandler; @@ -89,15 +66,44 @@ namespace Grpc.Core.Tests Server server; Channel channel; - public MockServiceHelper(string host = null) + public MockServiceHelper(string host = null, Marshaller<string> marshaller = null) { this.host = host ?? "localhost"; + marshaller = marshaller ?? Marshallers.StringMarshaller; + + unaryMethod = new Method<string, string>( + MethodType.Unary, + ServiceName, + "Unary", + marshaller, + marshaller); + + clientStreamingMethod = new Method<string, string>( + MethodType.ClientStreaming, + ServiceName, + "ClientStreaming", + marshaller, + marshaller); + + serverStreamingMethod = new Method<string, string>( + MethodType.ServerStreaming, + ServiceName, + "ServerStreaming", + marshaller, + marshaller); + + duplexStreamingMethod = new Method<string, string>( + MethodType.DuplexStreaming, + ServiceName, + "DuplexStreaming", + marshaller, + marshaller); serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName) - .AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context)) - .AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context)) - .AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context)) - .AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context)) + .AddMethod(unaryMethod, (request, context) => unaryHandler(request, context)) + .AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context)) + .AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context)) + .AddMethod(duplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context)) .Build(); var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own."); @@ -155,22 +161,22 @@ namespace Grpc.Core.Tests public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = default(CallOptions)) { - return new CallInvocationDetails<string, string>(channel, UnaryMethod, options); + return new CallInvocationDetails<string, string>(channel, unaryMethod, options); } public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = default(CallOptions)) { - return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options); + return new CallInvocationDetails<string, string>(channel, clientStreamingMethod, options); } public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = default(CallOptions)) { - return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options); + return new CallInvocationDetails<string, string>(channel, serverStreamingMethod, options); } public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = default(CallOptions)) { - return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options); + return new CallInvocationDetails<string, string>(channel, duplexStreamingMethod, options); } public string Host diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index be5d611a53..e3b00781c6 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -322,6 +322,11 @@ namespace Grpc.Core.Internal details.Channel.RemoveCallReference(this); } + protected override bool IsClient + { + get { return true; } + } + private void Initialize(CompletionQueueSafeHandle cq) { var call = CreateNativeCall(cq); @@ -376,9 +381,17 @@ namespace Grpc.Core.Internal /// </summary> private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) { + TResponse msg = default(TResponse); + var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null; + lock (myLock) { finished = true; + + if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK) + { + receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); + } finishedStatus = receivedStatus; ReleaseResourcesIfPossible(); @@ -394,10 +407,6 @@ namespace Grpc.Core.Internal return; } - // TODO: handle deserialization error - TResponse msg; - TryDeserialize(receivedMessage, out msg); - unaryResponseTcs.SetResult(msg); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 4d20394644..3e2c57c9b5 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -33,10 +33,12 @@ using System; using System.Diagnostics; +using System.IO; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; + using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Utils; @@ -50,6 +52,7 @@ namespace Grpc.Core.Internal internal abstract class AsyncCallBase<TWrite, TRead> { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); + protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; @@ -100,11 +103,10 @@ namespace Grpc.Core.Internal /// <summary> /// Requests cancelling the call with given status. /// </summary> - public void CancelWithStatus(Status status) + protected void CancelWithStatus(Status status) { lock (myLock) { - Preconditions.CheckState(started); cancelRequested = true; if (!disposed) @@ -177,6 +179,11 @@ namespace Grpc.Core.Internal return false; } + protected abstract bool IsClient + { + get; + } + private void ReleaseResources() { if (call != null) @@ -224,33 +231,31 @@ namespace Grpc.Core.Internal return serializer(msg); } - protected bool TrySerialize(TWrite msg, out byte[] payload) + protected Exception TrySerialize(TWrite msg, out byte[] payload) { try { payload = serializer(msg); - return true; + return null; } catch (Exception e) { - Logger.Error(e, "Exception occured while trying to serialize message"); payload = null; - return false; + return e; } } - protected bool TryDeserialize(byte[] payload, out TRead msg) + protected Exception TryDeserialize(byte[] payload, out TRead msg) { try { msg = deserializer(payload); - return true; + return null; } catch (Exception e) { - Logger.Error(e, "Exception occured while trying to deserialize message."); msg = default(TRead); - return false; + return e; } } @@ -319,6 +324,9 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleReadFinished(bool success, byte[] receivedMessage) { + TRead msg = default(TRead); + var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; + AsyncCompletionDelegate<TRead> origCompletionDelegate = null; lock (myLock) { @@ -331,23 +339,23 @@ namespace Grpc.Core.Internal readingDone = true; } + if (deserializeException != null && IsClient) + { + readingDone = true; + CancelWithStatus(DeserializeResponseFailureStatus); + } + ReleaseResourcesIfPossible(); } - // TODO: handle the case when error occured... + // TODO: handle the case when success==false - if (receivedMessage != null) - { - // TODO: handle deserialization error - TRead msg; - TryDeserialize(receivedMessage, out msg); - - FireCompletion(origCompletionDelegate, msg, null); - } - else + if (deserializeException != null && !IsClient) { - FireCompletion(origCompletionDelegate, default(TRead), null); + FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); + return; } + FireCompletion(origCompletionDelegate, msg, null); } } }
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 5c47251030..46ca459349 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -169,6 +169,11 @@ namespace Grpc.Core.Internal } } + protected override bool IsClient + { + get { return false; } + } + protected override void CheckReadingAllowed() { base.CheckReadingAllowed(); diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs index f38cb0863f..3493d2d38f 100644 --- a/src/csharp/Grpc.Core/Marshaller.cs +++ b/src/csharp/Grpc.Core/Marshaller.cs @@ -39,7 +39,7 @@ namespace Grpc.Core /// <summary> /// Encapsulates the logic for serializing and deserializing messages. /// </summary> - public struct Marshaller<T> + public class Marshaller<T> { readonly Func<T, byte[]> serializer; readonly Func<byte[], T> deserializer; diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index 99fe0b5478..21bdf4f114 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -33,8 +33,10 @@ using System; using System.Collections; using System.Collections.Generic; using System.Collections.Specialized; +using System.Globalization; using System.Runtime.InteropServices; using System.Text; +using System.Text.RegularExpressions; using Grpc.Core.Utils; @@ -188,6 +190,7 @@ namespace Grpc.Core public struct Entry { private static readonly Encoding Encoding = Encoding.ASCII; + private static readonly Regex ValidKeyRegex = new Regex("^[a-z0-9_-]+$"); readonly string key; readonly string value; @@ -320,7 +323,10 @@ namespace Grpc.Core private static string NormalizeKey(string key) { - return Preconditions.CheckNotNull(key, "key").ToLower(); + var normalized = Preconditions.CheckNotNull(key, "key").ToLower(CultureInfo.InvariantCulture); + Preconditions.CheckArgument(ValidKeyRegex.IsMatch(normalized), + "Metadata entry key not valid. Keys can only contain lowercase alphanumeric characters, underscores and hyphens."); + return normalized; } } } diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index d8547758d2..e2975b5da9 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -162,7 +162,7 @@ namespace Math.Tests { using (var call = client.Sum()) { - var numbers = new List<long> { 10, 20, 30 }.ConvertAll(n => new Num{ Num_ = n }); + var numbers = new List<long> { 10, 20, 30 }.ConvertAll(n => new Num { Num_ = n }); await call.RequestStream.WriteAllAsync(numbers); var result = await call.ResponseAsync; diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs index 95f742cc99..6c3a53bec0 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs @@ -88,7 +88,7 @@ namespace Grpc.HealthCheck.Tests [Test] public void ServiceDoesntExist() { - Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest{ Host = "", Service = "nonexistent.service" })); + Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest { Host = "", Service = "nonexistent.service" })); } // TODO(jtattermusch): add test with timeout once timeouts are supported diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs index 8de8645cd1..2097c0dc8c 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs @@ -101,7 +101,7 @@ namespace Grpc.HealthCheck.Tests private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string host, string service) { - return impl.Check(new HealthCheckRequest{ Host = host, Service = service}, null).Result.Status; + return impl.Check(new HealthCheckRequest { Host = host, Service = service }, null).Result.Status; } } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index ed51af1942..8343e54122 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -37,13 +37,12 @@ using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using Google.Apis.Auth.OAuth2; +using Google.Protobuf; using Grpc.Auth; using Grpc.Core; using Grpc.Core.Utils; using Grpc.Testing; -using Google.Protobuf; -using Google.Apis.Auth.OAuth2; - using NUnit.Framework; namespace Grpc.IntegrationTesting diff --git a/src/csharp/README.md b/src/csharp/README.md index bb5e165986..30523b3bd2 100644 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -19,7 +19,7 @@ Usage: Windows That will also pull all the transitive dependencies (including the native libraries that gRPC C# is internally using). -- Helloworld project example can be found in https://github.com/grpc/grpc-common/tree/master/csharp. +- Helloworld project example can be found in https://github.com/grpc/grpc/tree/master/examples/csharp. Usage: Linux (Mono) -------------- @@ -50,7 +50,7 @@ Usage: Linux (Mono) - Add NuGet package `Grpc` as a dependency (Project -> Add NuGet packages). -- Helloworld project example can be found in https://github.com/grpc/grpc-common/tree/master/csharp. +- Helloworld project example can be found in https://github.com/grpc/grpc/tree/master/examples/csharp. Usage: MacOS (Mono) -------------- @@ -73,7 +73,7 @@ Usage: MacOS (Mono) - *You will be able to build your project in Xamarin Studio, but to run or test it, you will need to run it under 64-bit version of Mono.* -- Helloworld project example can be found in https://github.com/grpc/grpc-common/tree/master/csharp. +- Helloworld project example can be found in https://github.com/grpc/grpc/tree/master/examples/csharp. Building: Windows ----------------- diff --git a/src/objective-c/README.md b/src/objective-c/README.md index e997b76d14..6c27657def 100644 --- a/src/objective-c/README.md +++ b/src/objective-c/README.md @@ -30,7 +30,7 @@ proceed. ## Write your API declaration in proto format For this you can consult the [Protocol Buffers][]' official documentation, or learn from a quick -example [here](https://github.com/grpc/grpc-common#defining-a-service). +example [here](https://github.com/grpc/grpc/tree/master/examples#defining-a-service). <a name="cocoapods"></a> ## Integrate a proto library in your project diff --git a/src/objective-c/tests/LocalClearTextTests.m b/src/objective-c/tests/LocalClearTextTests.m index 4317614dd9..d01fe91afa 100644 --- a/src/objective-c/tests/LocalClearTextTests.m +++ b/src/objective-c/tests/LocalClearTextTests.m @@ -42,7 +42,7 @@ #import <RxLibrary/GRXWriter+Immediate.h> // These tests require a gRPC "RouteGuide" sample server to be running locally. You can compile and -// run one by following the instructions here: https://github.com/grpc/grpc-common/blob/master/cpp/cpptutorial.md#try-it-out +// run one by following the instructions here: https://github.com/grpc/grpc/blob/master/examples/cpp/cpptutorial.md#try-it-out // Be sure to have the C gRPC library installed in your system (for example, by having followed the // instructions at https://github.com/grpc/homebrew-grpc diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index 287621d930..a368dd4ee0 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -47,6 +47,10 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { $this->assertTrue(self::$client->waitForReady(250000)); } + public function testGetTarget() { + $this->assertTrue(is_string(self::$client->getTarget())); + } + public function testSimpleRequest() { $div_arg = new math\DivArgs(); $div_arg->setDividend(7); diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index 376d306da0..bd15ee4303 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -332,11 +332,7 @@ if (in_array($args['test_case'], array( $opts['update_metadata'] = $auth->getUpdateMetadataFunc(); } -$internal_stub = new Grpc\BaseStub($server_address, $opts); -hardAssert(is_string($internal_stub->getTarget()), - 'Unexpected target URI value'); - -$stub = new grpc\testing\TestServiceClient($internal_stub); +$stub = new grpc\testing\TestServiceClient($server_address, $opts); echo "Connecting to $server_address\n"; echo "Running test case $args[test_case]\n"; @@ -372,6 +368,11 @@ switch ($args['test_case']) { case 'jwt_token_creds': jwtTokenCreds($stub, $args); break; + case 'cancel_after_begin': + // Currently unimplementable with the current API design + // Specifically, in the ClientStreamingCall->start() method, the + // messages are sent immediately after metadata is sent. There is + // currently no way to cancel before messages are sent. default: exit(1); } diff --git a/src/python/grpcio/grpc/_adapter/_c/types/server_credentials.c b/src/python/grpcio/grpc/_adapter/_c/types/server_credentials.c index 2ba855e76c..df51a99b6a 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/server_credentials.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/server_credentials.c @@ -99,11 +99,13 @@ ServerCredentials *pygrpc_ServerCredentials_ssl( const char *root_certs; PyObject *py_key_cert_pairs; grpc_ssl_pem_key_cert_pair *key_cert_pairs; + int force_client_auth; size_t num_key_cert_pairs; size_t i; - static char *keywords[] = {"root_certs", "key_cert_pairs", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "zO:ssl", keywords, - &root_certs, &py_key_cert_pairs)) { + static char *keywords[] = { + "root_certs", "key_cert_pairs", "force_client_auth", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "zOi:ssl", keywords, + &root_certs, &py_key_cert_pairs, &force_client_auth)) { return NULL; } if (!PyList_Check(py_key_cert_pairs)) { @@ -128,11 +130,8 @@ ServerCredentials *pygrpc_ServerCredentials_ssl( } self = (ServerCredentials *)type->tp_alloc(type, 0); - /* TODO: Add a force_client_auth parameter in the python object and pass it - here as the last arg. */ self->c_creds = grpc_ssl_server_credentials_create( - root_certs, key_cert_pairs, num_key_cert_pairs, 0, NULL); + root_certs, key_cert_pairs, num_key_cert_pairs, force_client_auth, NULL); gpr_free(key_cert_pairs); return self; } - diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py index e7bf9dc462..06358e72bc 100644 --- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py +++ b/src/python/grpcio/grpc/_adapter/_intermediary_low.py @@ -255,5 +255,6 @@ class ClientCredentials(object): class ServerCredentials(object): """Adapter from old _low.ServerCredentials interface to new _low.ServerCredentials.""" - def __init__(self, root_credentials, pair_sequence): - self._internal = _low.ServerCredentials.ssl(root_credentials, list(pair_sequence)) + def __init__(self, root_credentials, pair_sequence, force_client_auth): + self._internal = _low.ServerCredentials.ssl( + root_credentials, list(pair_sequence), force_client_auth) diff --git a/src/python/grpcio/grpc/_adapter/fore.py b/src/python/grpcio/grpc/_adapter/fore.py index 7d88bda263..daa41e8bde 100644 --- a/src/python/grpcio/grpc/_adapter/fore.py +++ b/src/python/grpcio/grpc/_adapter/fore.py @@ -288,7 +288,7 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated): self._port = self._server.add_http2_addr(address) else: server_credentials = _low.ServerCredentials( - self._root_certificates, self._key_chain_pairs) + self._root_certificates, self._key_chain_pairs, False) self._server = _low.Server(self._completion_queue) self._port = self._server.add_secure_http2_addr( address, server_credentials) diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 43c4c0e80c..10634e43b5 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -366,10 +366,10 @@ class ServiceLink(links.Link): """Adds a port on which to service RPCs after this link has been started. Args: - port: The port on which to service RPCs, or zero to request that a port be - automatically selected and used. - server_credentials: A ServerCredentials object, or None for insecure - service. + port: The port on which to service RPCs, or zero to request that a port + be automatically selected and used. + server_credentials: An _intermediary_low.ServerCredentials object, or + None for insecure service. Returns: A port on which RPCs will be serviced after this link has been started. diff --git a/src/python/grpcio_test/grpc_interop/client.py b/src/python/grpcio_test/grpc_interop/client.py index 2dd2103cbe..36afe6c096 100644 --- a/src/python/grpcio_test/grpc_interop/client.py +++ b/src/python/grpcio_test/grpc_interop/client.py @@ -70,7 +70,13 @@ def _oauth_access_token(args): def _stub(args): if args.oauth_scope: - metadata_transformer = lambda x: [('Authorization', 'Bearer %s' % _oauth_access_token(args))] + if args.test_case == 'oauth2_auth_token': + access_token = _oauth_access_token(args) + metadata_transformer = lambda x: [ + ('Authorization', 'Bearer %s' % access_token)] + else: + metadata_transformer = lambda x: [ + ('Authorization', 'Bearer %s' % _oauth_access_token(args))] else: metadata_transformer = lambda x: [] if args.use_tls: diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py index 7a831f3cbd..642458e892 100644 --- a/src/python/grpcio_test/grpc_interop/methods.py +++ b/src/python/grpcio_test/grpc_interop/methods.py @@ -360,6 +360,19 @@ def _service_account_creds(stub, args): (response.oauth_scope, args.oauth_scope)) +def _oauth2_auth_token(stub, args): + json_key_filename = os.environ[ + oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] + response = _large_unary_common_behavior(stub, True, True) + if wanted_email != response.username: + raise ValueError( + 'expected username %s, got %s' % (wanted_email, response.username)) + if args.oauth_scope.find(response.oauth_scope) == -1: + raise ValueError( + 'expected to find oauth scope "%s" in received "%s"' % + (response.oauth_scope, args.oauth_scope)) + @enum.unique class TestCase(enum.Enum): EMPTY_UNARY = 'empty_unary' @@ -371,6 +384,7 @@ class TestCase(enum.Enum): CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' SERVICE_ACCOUNT_CREDS = 'service_account_creds' + OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' def test_interoperability(self, stub, args): @@ -394,5 +408,7 @@ class TestCase(enum.Enum): _compute_engine_creds(stub, args) elif self is TestCase.SERVICE_ACCOUNT_CREDS: _service_account_creds(stub, args) + elif self is TestCase.OAUTH2_AUTH_TOKEN: + _oauth2_auth_token(stub, args) else: raise NotImplementedError('Test case "%s" not implemented!' % self.name) |