aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r--src/csharp/Grpc.Core/CallInvocationDetails.cs (renamed from src/csharp/Grpc.Core/Call.cs)56
-rw-r--r--src/csharp/Grpc.Core/CallOptions.cs118
-rw-r--r--src/csharp/Grpc.Core/Calls.cs48
-rw-r--r--src/csharp/Grpc.Core/Channel.cs110
-rw-r--r--src/csharp/Grpc.Core/ChannelOptions.cs4
-rw-r--r--src/csharp/Grpc.Core/ChannelState.cs69
-rw-r--r--src/csharp/Grpc.Core/ClientBase.cs10
-rw-r--r--src/csharp/Grpc.Core/CompressionLevel.cs63
-rw-r--r--src/csharp/Grpc.Core/ContextPropagationToken.cs139
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj15
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.nuspec1
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs12
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs2
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamWriter.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs116
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs41
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs37
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs23
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs31
-rw-r--r--src/csharp/Grpc.Core/KeyCertificatePair.cs1
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs11
-rw-r--r--src/csharp/Grpc.Core/Method.cs28
-rw-r--r--src/csharp/Grpc.Core/Server.cs175
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs69
-rw-r--r--src/csharp/Grpc.Core/ServerCredentials.cs28
-rw-r--r--src/csharp/Grpc.Core/ServerPort.cs120
-rw-r--r--src/csharp/Grpc.Core/ServerServiceDefinition.cs12
-rw-r--r--src/csharp/Grpc.Core/Version.cs2
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs2
-rw-r--r--src/csharp/Grpc.Core/WriteOptions.cs82
-rw-r--r--src/csharp/Grpc.Core/packages.config1
35 files changed, 1240 insertions, 263 deletions
diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/CallInvocationDetails.cs
index 94c5e26082..eb23a3a209 100644
--- a/src/csharp/Grpc.Core/Call.cs
+++ b/src/csharp/Grpc.Core/CallInvocationDetails.cs
@@ -38,30 +38,30 @@ using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
- /// Abstraction of a call to be invoked on a client.
+ /// Details about a client-side call to be invoked.
/// </summary>
- public class Call<TRequest, TResponse>
+ public class CallInvocationDetails<TRequest, TResponse>
{
- readonly string name;
+ readonly Channel channel;
+ readonly string method;
+ readonly string host;
readonly Marshaller<TRequest> requestMarshaller;
readonly Marshaller<TResponse> responseMarshaller;
- readonly Channel channel;
- readonly Metadata headers;
- readonly DateTime deadline;
+ readonly CallOptions options;
- public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers)
- : this(serviceName, method, channel, headers, DateTime.MaxValue)
+ public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallOptions options) :
+ this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, options)
{
}
- public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers, DateTime deadline)
+ public CallInvocationDetails(Channel channel, string method, string host, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller, CallOptions options)
{
- this.name = method.GetFullName(serviceName);
- this.requestMarshaller = method.RequestMarshaller;
- this.responseMarshaller = method.ResponseMarshaller;
this.channel = Preconditions.CheckNotNull(channel);
- this.headers = Preconditions.CheckNotNull(headers);
- this.deadline = deadline;
+ this.method = Preconditions.CheckNotNull(method);
+ this.host = host;
+ this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller);
+ this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller);
+ this.options = Preconditions.CheckNotNull(options);
}
public Channel Channel
@@ -72,49 +72,43 @@ namespace Grpc.Core
}
}
- /// <summary>
- /// Full methods name including the service name.
- /// </summary>
- public string Name
+ public string Method
{
get
{
- return name;
+ return this.method;
}
}
- /// <summary>
- /// Headers to send at the beginning of the call.
- /// </summary>
- public Metadata Headers
+ public string Host
{
get
{
- return headers;
+ return this.host;
}
}
- public DateTime Deadline
+ public Marshaller<TRequest> RequestMarshaller
{
get
{
- return this.deadline;
+ return this.requestMarshaller;
}
}
- public Marshaller<TRequest> RequestMarshaller
+ public Marshaller<TResponse> ResponseMarshaller
{
get
{
- return requestMarshaller;
+ return this.responseMarshaller;
}
}
-
- public Marshaller<TResponse> ResponseMarshaller
+
+ public CallOptions Options
{
get
{
- return responseMarshaller;
+ return options;
}
}
}
diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs
new file mode 100644
index 0000000000..0d82b5a28e
--- /dev/null
+++ b/src/csharp/Grpc.Core/CallOptions.cs
@@ -0,0 +1,118 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Options for calls made by client.
+ /// </summary>
+ public class CallOptions
+ {
+ readonly Metadata headers;
+ readonly DateTime deadline;
+ readonly CancellationToken cancellationToken;
+ readonly WriteOptions writeOptions;
+ readonly ContextPropagationToken propagationToken;
+
+ /// <summary>
+ /// Creates a new instance of <c>CallOptions</c>.
+ /// </summary>
+ /// <param name="headers">Headers to be sent with the call.</param>
+ /// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
+ /// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
+ /// <param name="writeOptions">Write options that will be used for this call.</param>
+ /// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
+ public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null,
+ WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
+ {
+ // TODO(jtattermusch): consider only creating metadata object once it's really needed.
+ this.headers = headers ?? new Metadata();
+ this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue);
+ this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None);
+ this.writeOptions = writeOptions;
+ this.propagationToken = propagationToken;
+ }
+
+ /// <summary>
+ /// Headers to send at the beginning of the call.
+ /// </summary>
+ public Metadata Headers
+ {
+ get { return headers; }
+ }
+
+ /// <summary>
+ /// Call deadline.
+ /// </summary>
+ public DateTime Deadline
+ {
+ get { return deadline; }
+ }
+
+ /// <summary>
+ /// Token that can be used for cancelling the call.
+ /// </summary>
+ public CancellationToken CancellationToken
+ {
+ get { return cancellationToken; }
+ }
+
+ /// <summary>
+ /// Write options that will be used for this call.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+ }
+
+ /// <summary>
+ /// Token for propagating parent call context.
+ /// </summary>
+ public ContextPropagationToken PropagationToken
+ {
+ get
+ {
+ return this.propagationToken;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 054fc27491..00a8cabf82 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -43,70 +43,52 @@ namespace Grpc.Core
/// </summary>
public static class Calls
{
- public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
+ public static TResponse BlockingUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- // TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts.
- RegisterCancellationCallback(asyncCall, token);
- return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers, call.Deadline);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call);
+ return asyncCall.UnaryCall(req);
}
- public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
+ public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
- var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers, call.Deadline);
- RegisterCancellationCallback(asyncCall, token);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call);
+ var asyncResult = asyncCall.UnaryCallAsync(req);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
- public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
+ public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
- asyncCall.StartServerStreamingCall(req, call.Headers, call.Deadline);
- RegisterCancellationCallback(asyncCall, token);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call);
+ asyncCall.StartServerStreamingCall(req);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
- public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
+ public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
- var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers, call.Deadline);
- RegisterCancellationCallback(asyncCall, token);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call);
+ var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
- public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
+ public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
- asyncCall.StartDuplexStreamingCall(call.Headers, call.Deadline);
- RegisterCancellationCallback(asyncCall, token);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call);
+ asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
-
- private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)
- {
- if (token.CanBeCanceled)
- {
- token.Register(() => asyncCall.Cancel());
- }
- }
}
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 18e6f2fda5..9273ea4582 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -37,6 +37,8 @@ using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
+using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
@@ -45,21 +47,23 @@ namespace Grpc.Core
/// </summary>
public class Channel : IDisposable
{
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
+
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
- readonly string target;
bool disposed;
/// <summary>
/// Creates a channel that connects to a specific host.
- /// Port will default to 80 for an unsecure channel and to 443 a secure channel.
+ /// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
/// </summary>
- /// <param name="host">The DNS name of IP address of the host.</param>
+ /// <param name="host">The name or IP address of the host.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
public Channel(string host, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
+ Preconditions.CheckNotNull(host);
this.environment = GrpcEnvironment.GetInstance();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
@@ -76,14 +80,13 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.CreateInsecure(host, nativeChannelArgs);
}
}
- this.target = GetOverridenTarget(host, this.options);
}
/// <summary>
/// Creates a channel that connects to a specific host and port.
/// </summary>
- /// <param name="host">DNS name or IP address</param>
- /// <param name="port">the port</param>
+ /// <param name="host">The name or IP address of the host.</param>
+ /// <param name="port">The port.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
public Channel(string host, int port, Credentials credentials, IEnumerable<ChannelOption> options = null) :
@@ -91,41 +94,87 @@ namespace Grpc.Core
{
}
- public void Dispose()
+ /// <summary>
+ /// Gets current connectivity state of this channel.
+ /// </summary>
+ public ChannelState State
{
- Dispose(true);
- GC.SuppressFinalize(this);
+ get
+ {
+ return handle.CheckConnectivityState(false);
+ }
}
- internal string Target
+ /// <summary>
+ /// Returned tasks completes once channel state has become different from
+ /// given lastObservedState.
+ /// If deadline is reached or and error occurs, returned task is cancelled.
+ /// </summary>
+ public Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
{
- get
+ Preconditions.CheckArgument(lastObservedState != ChannelState.FatalFailure,
+ "FatalFailure is a terminal state. No further state changes can occur.");
+ var tcs = new TaskCompletionSource<object>();
+ var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
+ var handler = new BatchCompletionDelegate((success, ctx) =>
{
- return target;
- }
+ if (success)
+ {
+ tcs.SetResult(null);
+ }
+ else
+ {
+ tcs.SetCanceled();
+ }
+ });
+ handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
+ return tcs.Task;
}
- internal ChannelSafeHandle Handle
+ /// <summary> Address of the remote endpoint in URI format.</summary>
+ public string Target
{
get
{
- return this.handle;
+ return handle.GetTarget();
}
}
- internal CompletionQueueSafeHandle CompletionQueue
+ /// <summary>
+ /// Allows explicitly requesting channel to connect without starting an RPC.
+ /// Returned task completes once state Ready was seen. If the deadline is reached,
+ /// or channel enters the FatalFailure state, the task is cancelled.
+ /// There is no need to call this explicitly unless your use case requires that.
+ /// Starting an RPC on a new channel will request connection implicitly.
+ /// </summary>
+ public async Task ConnectAsync(DateTime? deadline = null)
{
- get
+ var currentState = handle.CheckConnectivityState(true);
+ while (currentState != ChannelState.Ready)
{
- return this.environment.CompletionQueue;
+ if (currentState == ChannelState.FatalFailure)
+ {
+ throw new OperationCanceledException("Channel has reached FatalFailure state.");
+ }
+ await WaitForStateChangedAsync(currentState, deadline);
+ currentState = handle.CheckConnectivityState(false);
}
}
- internal CompletionRegistry CompletionRegistry
+ /// <summary>
+ /// Destroys the underlying channel.
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ internal ChannelSafeHandle Handle
{
get
{
- return this.environment.CompletionRegistry;
+ return this.handle;
}
}
@@ -159,26 +208,5 @@ namespace Grpc.Core
// TODO(jtattermusch): it would be useful to also provide .NET/mono version.
return string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion);
}
-
- /// <summary>
- /// Look for SslTargetNameOverride option and return its value instead of originalTarget
- /// if found.
- /// </summary>
- private static string GetOverridenTarget(string originalTarget, IEnumerable<ChannelOption> options)
- {
- if (options == null)
- {
- return originalTarget;
- }
- foreach (var option in options)
- {
- if (option.Type == ChannelOption.OptionType.String
- && option.Name == ChannelOptions.SslTargetNameOverride)
- {
- return option.StringValue;
- }
- }
- return originalTarget;
- }
}
}
diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs
index 9fe03d2805..1e0f90287a 100644
--- a/src/csharp/Grpc.Core/ChannelOptions.cs
+++ b/src/csharp/Grpc.Core/ChannelOptions.cs
@@ -30,7 +30,6 @@
#endregion
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -135,6 +134,9 @@ namespace Grpc.Core
/// <summary>Initial sequence number for http2 transports</summary>
public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number";
+ /// <summary>Default authority for calls.</summary>
+ public const string DefaultAuthority = "grpc.default_authority";
+
/// <summary>Primary user agent: goes at the start of the user-agent metadata</summary>
public const string PrimaryUserAgentString = "grpc.primary_user_agent";
diff --git a/src/csharp/Grpc.Core/ChannelState.cs b/src/csharp/Grpc.Core/ChannelState.cs
new file mode 100644
index 0000000000..d293b98f75
--- /dev/null
+++ b/src/csharp/Grpc.Core/ChannelState.cs
@@ -0,0 +1,69 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Connectivity state of a channel.
+ /// Based on grpc_connectivity_state from grpc/grpc.h
+ /// </summary>
+ public enum ChannelState
+ {
+ /// <summary>
+ /// Channel is idle
+ /// </summary>
+ Idle,
+
+ /// <summary>
+ /// Channel is connecting
+ /// </summary>
+ Connecting,
+
+ /// <summary>
+ /// Channel is ready for work
+ /// </summary>
+ Ready,
+
+ /// <summary>
+ /// Channel has seen a failure but expects to recover
+ /// </summary>
+ TransientFailure,
+
+ /// <summary>
+ /// Channel has seen a failure that it cannot recover from
+ /// </summary>
+ FatalFailure
+ }
+}
diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs
index fd3473128a..88494bb4ac 100644
--- a/src/csharp/Grpc.Core/ClientBase.cs
+++ b/src/csharp/Grpc.Core/ClientBase.cs
@@ -76,19 +76,17 @@ namespace Grpc.Core
/// <summary>
/// Creates a new call to given method.
/// </summary>
- protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata, DateTime? deadline)
+ protected CallInvocationDetails<TRequest, TResponse> CreateCall<TRequest, TResponse>(Method<TRequest, TResponse> method, CallOptions options)
where TRequest : class
where TResponse : class
{
var interceptor = HeaderInterceptor;
if (interceptor != null)
{
- metadata = metadata ?? new Metadata();
- interceptor(metadata);
- metadata.Freeze();
+ interceptor(options.Headers);
+ options.Headers.Freeze();
}
- return new Call<TRequest, TResponse>(serviceName, method, channel,
- metadata ?? Metadata.Empty, deadline ?? DateTime.MaxValue);
+ return new CallInvocationDetails<TRequest, TResponse>(channel, method, options);
}
}
}
diff --git a/src/csharp/Grpc.Core/CompressionLevel.cs b/src/csharp/Grpc.Core/CompressionLevel.cs
new file mode 100644
index 0000000000..399652b85e
--- /dev/null
+++ b/src/csharp/Grpc.Core/CompressionLevel.cs
@@ -0,0 +1,63 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Compression level based on grpc_compression_level from grpc/compression.h
+ /// </summary>
+ public enum CompressionLevel
+ {
+ /// <summary>
+ /// No compression.
+ /// </summary>
+ None = 0,
+
+ /// <summary>
+ /// Low compression.
+ /// </summary>
+ Low,
+
+ /// <summary>
+ /// Medium compression.
+ /// </summary>
+ Medium,
+
+ /// <summary>
+ /// High compression.
+ /// </summary>
+ High,
+ }
+}
diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs
new file mode 100644
index 0000000000..b6ea5115a4
--- /dev/null
+++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs
@@ -0,0 +1,139 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Token for propagating context of server side handlers to child calls.
+ /// In situations when a backend is making calls to another backend,
+ /// it makes sense to propagate properties like deadline and cancellation
+ /// token of the server call to the child call.
+ /// C core provides some other contexts (like tracing context) that
+ /// are not accessible to C# layer, but this token still allows propagating them.
+ /// </summary>
+ public class ContextPropagationToken
+ {
+ /// <summary>
+ /// Default propagation mask used by C core.
+ /// </summary>
+ const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
+
+ /// <summary>
+ /// Default propagation mask used by C# - we want to propagate deadline
+ /// and cancellation token by our own means.
+ /// </summary>
+ internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
+ & ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
+
+ readonly CallSafeHandle parentCall;
+ readonly DateTime deadline;
+ readonly CancellationToken cancellationToken;
+ readonly ContextPropagationOptions options;
+
+ internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
+ {
+ this.parentCall = Preconditions.CheckNotNull(parentCall);
+ this.deadline = deadline;
+ this.cancellationToken = cancellationToken;
+ this.options = options ?? ContextPropagationOptions.Default;
+ }
+
+ internal CallSafeHandle ParentCall
+ {
+ get
+ {
+ return this.parentCall;
+ }
+ }
+
+ internal DateTime Deadline
+ {
+ get
+ {
+ return this.deadline;
+ }
+ }
+
+ internal CancellationToken CancellationToken
+ {
+ get
+ {
+ return this.cancellationToken;
+ }
+ }
+
+ internal ContextPropagationOptions Options
+ {
+ get
+ {
+ return this.options;
+ }
+ }
+
+ internal bool IsPropagateDeadline
+ {
+ get { return false; }
+ }
+
+ internal bool IsPropagateCancellation
+ {
+ get { return false; }
+ }
+ }
+
+ /// <summary>
+ /// Options for <see cref="ContextPropagationToken"/>.
+ /// </summary>
+ public class ContextPropagationOptions
+ {
+ public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
+ }
+
+ /// <summary>
+ /// Context propagation flags from grpc/grpc.h.
+ /// </summary>
+ [Flags]
+ internal enum ContextPropagationFlags
+ {
+ Deadline = 1,
+ CensusStatsContext = 2,
+ CensusTracingContext = 4,
+ Cancellation = 8
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 940a6b8ac0..e535c47f55 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -44,9 +44,6 @@
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
- <Reference Include="System.Collections.Immutable">
- <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
- </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />
@@ -55,11 +52,11 @@
<Compile Include="IServerStreamWriter.cs" />
<Compile Include="IAsyncStreamWriter.cs" />
<Compile Include="IAsyncStreamReader.cs" />
+ <Compile Include="ServerPort.cs" />
<Compile Include="Version.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RpcException.cs" />
<Compile Include="Calls.cs" />
- <Compile Include="Call.cs" />
<Compile Include="AsyncClientStreamingCall.cs" />
<Compile Include="GrpcEnvironment.cs" />
<Compile Include="Status.cs" />
@@ -115,6 +112,12 @@
<Compile Include="Logging\ILogger.cs" />
<Compile Include="Logging\ConsoleLogger.cs" />
<Compile Include="Internal\NativeLogRedirector.cs" />
+ <Compile Include="ChannelState.cs" />
+ <Compile Include="CallInvocationDetails.cs" />
+ <Compile Include="CallOptions.cs" />
+ <Compile Include="CompressionLevel.cs" />
+ <Compile Include="WriteOptions.cs" />
+ <Compile Include="ContextPropagationToken.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />
@@ -145,7 +148,5 @@
</Target>
<Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" />
<Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" />
- <ItemGroup>
- <Folder Include="Logging\" />
- </ItemGroup>
+ <ItemGroup />
</Project> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec
index 086776f69d..fe49efc7ec 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.nuspec
+++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec
@@ -15,7 +15,6 @@
<copyright>Copyright 2015, Google Inc.</copyright>
<tags>gRPC RPC Protocol HTTP/2</tags>
<dependencies>
- <dependency id="System.Collections.Immutable" version="1.1.36" />
<dependency id="Ix-Async" version="1.2.3" />
<dependency id="grpc.native.csharp_ext" version="$GrpcNativeCsharpExtVersion$" />
</dependencies>
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 034a66be3c..1bb83c9962 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -53,6 +53,9 @@ namespace Grpc.Core
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_shutdown();
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_version_string(); // returns not-owned const char*
+
static object staticLock = new object();
static GrpcEnvironment instance;
@@ -164,6 +167,15 @@ namespace Grpc.Core
}
/// <summary>
+ /// Gets version of gRPC C core.
+ /// </summary>
+ internal static string GetCoreVersionString()
+ {
+ var ptr = grpcsharp_version_string(); // the pointer is not owned
+ return Marshal.PtrToStringAnsi(ptr);
+ }
+
+ /// <summary>
/// Shuts down this environment.
/// </summary>
private void Close()
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 371fbf27ce..c0a0674e50 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -43,7 +43,7 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
- public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
+ public interface IAsyncStreamReader<T> : IAsyncEnumerator<T>
{
// TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface.
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 2000210252..4e2acb9c71 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -50,5 +50,13 @@ namespace Grpc.Core
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task WriteAsync(T message);
+
+ /// <summary>
+ /// Write options that will be used for the next write.
+ /// If null, default options will be used.
+ /// Once set, this property maintains its value across subsequent
+ /// writes.
+ /// <value>The write options.</value>
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index bfcb9366a1..0db9d2a515 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- Channel channel;
+ readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -60,26 +60,19 @@ namespace Grpc.Core.Internal
bool readObserverCompleted; // True if readObserver has already been completed.
- public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
+ public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
- }
-
- public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
- {
- this.channel = channel;
- var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline);
- channel.Environment.DebugStats.ActiveClientCalls.Increment();
- InitializeInternal(call);
+ this.details = callDetails;
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
// it is reusing fair amount of code in this class, so we are leaving it here.
- // TODO: for other calls, you need to call Initialize, this methods calls initialize
- // on its own, so there's a usage inconsistency.
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
- public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline)
+ public TResponse UnaryCall(TRequest msg)
{
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
@@ -89,17 +82,19 @@ namespace Grpc.Core.Internal
lock (myLock)
{
- Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline));
+ Preconditions.CheckState(!started);
started = true;
+ Initialize(cq);
+
halfcloseRequested = true;
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@@ -129,22 +124,24 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
- public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline)
+ public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
halfcloseRequested = true;
readingDone = true;
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -154,17 +151,19 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline)
+ public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@@ -176,21 +175,23 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
- public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline)
+ public void StartServerStreamingCall(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -199,15 +200,16 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public void StartDuplexStreamingCall(Metadata headers, DateTime deadline)
+ public void StartDuplexStreamingCall()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ Initialize(details.Channel.Environment.CompletionQueue);
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
@@ -218,9 +220,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -277,6 +279,14 @@ namespace Grpc.Core.Internal
}
}
+ public CallInvocationDetails<TRequest, TResponse> Details
+ {
+ get
+ {
+ return this.details;
+ }
+ }
+
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@@ -309,7 +319,39 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ }
+
+ private void Initialize(CompletionQueueSafeHandle cq)
+ {
+ var propagationToken = details.Options.PropagationToken;
+ var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+ var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ InitializeInternal(call);
+ RegisterCancellationCallback();
+ }
+
+ // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
+ private void RegisterCancellationCallback()
+ {
+ var token = details.Options.CancellationToken;
+ if (token.CanBeCanceled)
+ {
+ token.Register(() => this.Cancel());
+ }
+ }
+
+ /// <summary>
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ /// </summary>
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 38f2a5baeb..9fa0baca87 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
+ protected bool initialMetadataSent;
+ protected long streamingWritesCounter;
+
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@@ -123,7 +126,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
+ protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, HandleSendFinished);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+
sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ streamingWritesCounter++;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 513902ee36..3710a65d6b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -83,9 +83,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -98,6 +98,35 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Initiates sending a initial metadata.
+ /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
+ /// to make things simpler.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(headers, "metadata");
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
+ Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
+ CheckSendingAllowed();
+
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ {
+ call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
+ }
+
+ this.initialMetadataSent = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
@@ -111,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 714749b171..3cb01e29bd 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -42,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
+ public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
+
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
@@ -53,7 +55,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -62,7 +64,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@@ -70,7 +72,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -78,7 +80,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -89,6 +91,10 @@ namespace Grpc.Core.Internal
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
@@ -103,17 +109,17 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
- public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -124,11 +130,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
@@ -138,11 +144,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
+ public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
@@ -152,11 +158,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@@ -173,6 +179,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
+ public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ }
+
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 20815efbd3..7f03bf4ea5 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -47,7 +47,17 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+ static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_channel_watch_connectivity_state(ChannelSafeHandle channel, ChannelState lastObservedState,
+ Timespec deadline, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern CStringSafeHandle grpcsharp_channel_get_target(ChannelSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_channel_destroy(IntPtr channel);
@@ -66,13 +76,34 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
+ var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
+ public ChannelState CheckConnectivityState(bool tryToConnect)
+ {
+ return grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
+ }
+
+ public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
+ CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
+ }
+
+ public string GetTarget()
+ {
+ using (var cstring = grpcsharp_channel_get_target(this))
+ {
+ return cstring.GetValue();
+ }
+ }
+
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 58f493463b..013f00ff6f 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -40,16 +40,18 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
{
readonly AsyncCall<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ClientRequestStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
+ this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -59,5 +61,24 @@ namespace Grpc.Core.Internal
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 19f0e3c57f..688f9f6fec 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@@ -304,13 +304,14 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
- return new ServerCallContext(
- newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken);
+ return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
index 59238a452c..37a4f5256b 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
@@ -42,7 +42,7 @@ namespace Grpc.Core.Internal
internal class ServerCredentialsSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
- static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs);
+ static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_credentials_release(IntPtr credentials);
@@ -51,12 +51,13 @@ namespace Grpc.Core.Internal
{
}
- public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray)
+ public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, bool forceClientAuth)
{
Preconditions.CheckArgument(keyCertPairCertChainArray.Length == keyCertPairPrivateKeyArray.Length);
return grpcsharp_ssl_server_credentials_create(pemRootCerts,
keyCertPairCertChainArray, keyCertPairPrivateKeyArray,
- new UIntPtr((ulong)keyCertPairCertChainArray.Length));
+ new UIntPtr((ulong)keyCertPairCertChainArray.Length),
+ forceClientAuth);
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 756dcee87f..03e39efc02 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
- internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>
+ internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call)
{
@@ -52,7 +53,7 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -62,5 +63,31 @@ namespace Grpc.Core.Internal
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ var taskSource = new AsyncCompletionTaskSource<object>();
+ call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
+ return taskSource.Task;
+ }
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/KeyCertificatePair.cs b/src/csharp/Grpc.Core/KeyCertificatePair.cs
index 7cea18618e..5def15a656 100644
--- a/src/csharp/Grpc.Core/KeyCertificatePair.cs
+++ b/src/csharp/Grpc.Core/KeyCertificatePair.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index 2f308cbb11..a58dbdbc93 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -32,7 +32,6 @@
using System;
using System.Collections;
using System.Collections.Generic;
-using System.Collections.Immutable;
using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Text;
@@ -115,6 +114,16 @@ namespace Grpc.Core
entries.Add(item);
}
+ public void Add(string key, string value)
+ {
+ Add(new Entry(key, value));
+ }
+
+ public void Add(string key, byte[] valueBytes)
+ {
+ Add(new Entry(key, valueBytes));
+ }
+
public void Clear()
{
CheckWriteable();
diff --git a/src/csharp/Grpc.Core/Method.cs b/src/csharp/Grpc.Core/Method.cs
index 77d36191c3..cc047ac9f8 100644
--- a/src/csharp/Grpc.Core/Method.cs
+++ b/src/csharp/Grpc.Core/Method.cs
@@ -53,16 +53,20 @@ namespace Grpc.Core
public class Method<TRequest, TResponse>
{
readonly MethodType type;
+ readonly string serviceName;
readonly string name;
readonly Marshaller<TRequest> requestMarshaller;
readonly Marshaller<TResponse> responseMarshaller;
+ readonly string fullName;
- public Method(MethodType type, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller)
+ public Method(MethodType type, string serviceName, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller)
{
this.type = type;
- this.name = name;
- this.requestMarshaller = requestMarshaller;
- this.responseMarshaller = responseMarshaller;
+ this.serviceName = Preconditions.CheckNotNull(serviceName);
+ this.name = Preconditions.CheckNotNull(name);
+ this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller);
+ this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller);
+ this.fullName = GetFullName(serviceName);
}
public MethodType Type
@@ -72,6 +76,14 @@ namespace Grpc.Core
return this.type;
}
}
+
+ public string ServiceName
+ {
+ get
+ {
+ return this.serviceName;
+ }
+ }
public string Name
{
@@ -97,6 +109,14 @@ namespace Grpc.Core
}
}
+ public string FullName
+ {
+ get
+ {
+ return this.fullName;
+ }
+ }
+
/// <summary>
/// Gets full name of the method including the service name.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 3217547cc4..eb5b043d1c 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -32,7 +32,7 @@
#endregion
using System;
-using System.Collections.Concurrent;
+using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
@@ -48,18 +48,17 @@ namespace Grpc.Core
/// </summary>
public class Server
{
- /// <summary>
- /// Pass this value as port to have the server choose an unused listening port for you.
- /// </summary>
- public const int PickUnusedPort = 0;
-
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
+ readonly ServiceDefinitionCollection serviceDefinitions;
+ readonly ServerPortCollection ports;
readonly GrpcEnvironment environment;
readonly List<ChannelOption> options;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
+ readonly List<ServerServiceDefinition> serviceDefinitionsList = new List<ServerServiceDefinition>();
+ readonly List<ServerPort> serverPortList = new List<ServerPort>();
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
@@ -72,6 +71,8 @@ namespace Grpc.Core
/// <param name="options">Channel options.</param>
public Server(IEnumerable<ChannelOption> options = null)
{
+ this.serviceDefinitions = new ServiceDefinitionCollection(this);
+ this.ports = new ServerPortCollection(this);
this.environment = GrpcEnvironment.GetInstance();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
@@ -81,47 +82,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Adds a service definition to the server. This is how you register
- /// handlers for a service with the server.
- /// Only call this before Start().
+ /// Services that will be exported by the server once started. Register a service with this
+ /// server by adding its definition to this collection.
/// </summary>
- public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
+ public ServiceDefinitionCollection Services
{
- lock (myLock)
+ get
{
- Preconditions.CheckState(!startRequested);
- foreach (var entry in serviceDefinition.CallHandlers)
- {
- callHandlers.Add(entry.Key, entry.Value);
- }
+ return serviceDefinitions;
}
}
/// <summary>
- /// Add a port on which server should listen.
- /// Only call this before Start().
+ /// Ports on which the server will listen once started. Register a port with this
+ /// server by adding its definition to this collection.
/// </summary>
- /// <returns>The port on which server will be listening.</returns>
- /// <param name="host">the host</param>
- /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
- public int AddPort(string host, int port, ServerCredentials credentials)
+ public ServerPortCollection Ports
{
- lock (myLock)
+ get
{
- Preconditions.CheckNotNull(credentials);
- Preconditions.CheckState(!startRequested);
- var address = string.Format("{0}:{1}", host, port);
- using (var nativeCredentials = credentials.ToNativeCredentials())
- {
- if (nativeCredentials != null)
- {
- return handle.AddSecurePort(address, nativeCredentials);
- }
- else
- {
- return handle.AddInsecurePort(address);
- }
- }
+ return ports;
}
}
@@ -190,6 +170,50 @@ namespace Grpc.Core
}
/// <summary>
+ /// Adds a service definition.
+ /// </summary>
+ private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(!startRequested);
+ foreach (var entry in serviceDefinition.CallHandlers)
+ {
+ callHandlers.Add(entry.Key, entry.Value);
+ }
+ serviceDefinitionsList.Add(serviceDefinition);
+ }
+ }
+
+ /// <summary>
+ /// Adds a listening port.
+ /// </summary>
+ private int AddPortInternal(ServerPort serverPort)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(serverPort.Credentials);
+ Preconditions.CheckState(!startRequested);
+ var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port);
+ int boundPort;
+ using (var nativeCredentials = serverPort.Credentials.ToNativeCredentials())
+ {
+ if (nativeCredentials != null)
+ {
+ boundPort = handle.AddSecurePort(address, nativeCredentials);
+ }
+ else
+ {
+ boundPort = handle.AddInsecurePort(address);
+ }
+ }
+ var newServerPort = new ServerPort(serverPort, boundPort);
+ this.serverPortList.Add(newServerPort);
+ return boundPort;
+ }
+ }
+
+ /// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
private void AllowOneRpc()
@@ -249,5 +273,82 @@ namespace Grpc.Core
{
shutdownTcs.SetResult(null);
}
+
+ /// <summary>
+ /// Collection of service definitions.
+ /// </summary>
+ public class ServiceDefinitionCollection : IEnumerable<ServerServiceDefinition>
+ {
+ readonly Server server;
+
+ internal ServiceDefinitionCollection(Server server)
+ {
+ this.server = server;
+ }
+
+ /// <summary>
+ /// Adds a service definition to the server. This is how you register
+ /// handlers for a service with the server. Only call this before Start().
+ /// </summary>
+ public void Add(ServerServiceDefinition serviceDefinition)
+ {
+ server.AddServiceDefinitionInternal(serviceDefinition);
+ }
+
+ public IEnumerator<ServerServiceDefinition> GetEnumerator()
+ {
+ return server.serviceDefinitionsList.GetEnumerator();
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return server.serviceDefinitionsList.GetEnumerator();
+ }
+ }
+
+ /// <summary>
+ /// Collection of server ports.
+ /// </summary>
+ public class ServerPortCollection : IEnumerable<ServerPort>
+ {
+ readonly Server server;
+
+ internal ServerPortCollection(Server server)
+ {
+ this.server = server;
+ }
+
+ /// <summary>
+ /// Adds a new port on which server should listen.
+ /// Only call this before Start().
+ /// <returns>The port on which server will be listening.</returns>
+ /// </summary>
+ public int Add(ServerPort serverPort)
+ {
+ return server.AddPortInternal(serverPort);
+ }
+
+ /// <summary>
+ /// Adds a new port on which server should listen.
+ /// <returns>The port on which server will be listening.</returns>
+ /// </summary>
+ /// <param name="host">the host</param>
+ /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
+ /// <param name="credentials">credentials to use to secure this port.</param>
+ public int Add(string host, int port, ServerCredentials credentials)
+ {
+ return Add(new ServerPort(host, port, credentials));
+ }
+
+ public IEnumerator<ServerPort> GetEnumerator()
+ {
+ return server.serverPortList.GetEnumerator();
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return server.serverPortList.GetEnumerator();
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index 0c48adaea5..75d81c64f3 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -36,15 +36,16 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
+using Grpc.Core.Internal;
+
namespace Grpc.Core
{
/// <summary>
/// Context for a server-side call.
/// </summary>
- public sealed class ServerCallContext
+ public class ServerCallContext
{
- // TODO(jtattermusch): expose method to send initial metadata back to client
-
+ private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly string peer;
@@ -54,18 +55,37 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
+ private Func<Metadata, Task> writeHeadersFunc;
+ private IHasWriteOptions writeOptionsHolder;
- public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
+ internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
+ Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
+ this.callHandle = callHandle;
this.method = method;
this.host = host;
this.peer = peer;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
+ this.writeHeadersFunc = writeHeadersFunc;
+ this.writeOptionsHolder = writeOptionsHolder;
+ }
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ return writeHeadersFunc(responseHeaders);
+ }
+
+ /// <summary>
+ /// Creates a propagation token to be used to propagate call context to a child call.
+ /// </summary>
+ public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
+ {
+ return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
}
- /// <summary> Name of method called in this RPC. </summary>
+ /// <summary>Name of method called in this RPC.</summary>
public string Method
{
get
@@ -74,7 +94,7 @@ namespace Grpc.Core
}
}
- /// <summary> Name of host called in this RPC. </summary>
+ /// <summary>Name of host called in this RPC.</summary>
public string Host
{
get
@@ -83,7 +103,7 @@ namespace Grpc.Core
}
}
- /// <summary> Address of the remote endpoint in URI format. </summary>
+ /// <summary>Address of the remote endpoint in URI format.</summary>
public string Peer
{
get
@@ -92,7 +112,7 @@ namespace Grpc.Core
}
}
- /// <summary> Deadline for this RPC. </summary>
+ /// <summary>Deadline for this RPC.</summary>
public DateTime Deadline
{
get
@@ -101,7 +121,7 @@ namespace Grpc.Core
}
}
- /// <summary> Initial metadata sent by client. </summary>
+ /// <summary>Initial metadata sent by client.</summary>
public Metadata RequestHeaders
{
get
@@ -110,8 +130,7 @@ namespace Grpc.Core
}
}
- // TODO(jtattermusch): support signalling cancellation.
- /// <summary> Cancellation token signals when call is cancelled. </summary>
+ /// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
@@ -120,7 +139,7 @@ namespace Grpc.Core
}
}
- /// <summary> Trailers to send back to client after RPC finishes.</summary>
+ /// <summary>Trailers to send back to client after RPC finishes.</summary>
public Metadata ResponseTrailers
{
get
@@ -142,5 +161,31 @@ namespace Grpc.Core
status = value;
}
}
+
+ /// <summary>
+ /// Allows setting write options for the following write.
+ /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience.
+ /// Both properties are backed by the same underlying value.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptionsHolder.WriteOptions;
+ }
+
+ set
+ {
+ writeOptionsHolder.WriteOptions = value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Allows sharing write options between ServerCallContext and other objects.
+ /// </summary>
+ public interface IHasWriteOptions
+ {
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/ServerCredentials.cs b/src/csharp/Grpc.Core/ServerCredentials.cs
index 32ed4b78a1..c11a1ede08 100644
--- a/src/csharp/Grpc.Core/ServerCredentials.cs
+++ b/src/csharp/Grpc.Core/ServerCredentials.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@@ -80,18 +79,26 @@ namespace Grpc.Core
{
readonly IList<KeyCertificatePair> keyCertificatePairs;
readonly string rootCertificates;
+ readonly bool forceClientAuth;
/// <summary>
/// Creates server-side SSL credentials.
/// </summary>
- /// <param name="rootCertificates">PEM encoded client root certificates used to authenticate client.</param>
/// <param name="keyCertificatePairs">Key-certificates to use.</param>
- public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs, string rootCertificates)
+ /// <param name="rootCertificates">PEM encoded client root certificates used to authenticate client.</param>
+ /// <param name="forceClientAuth">If true, client will be rejected unless it proves its unthenticity using against rootCertificates.</param>
+ public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs, string rootCertificates, bool forceClientAuth)
{
this.keyCertificatePairs = new List<KeyCertificatePair>(keyCertificatePairs).AsReadOnly();
Preconditions.CheckArgument(this.keyCertificatePairs.Count > 0,
"At least one KeyCertificatePair needs to be provided");
+ if (forceClientAuth)
+ {
+ Preconditions.CheckNotNull(rootCertificates,
+ "Cannot force client authentication unless you provide rootCertificates.");
+ }
this.rootCertificates = rootCertificates;
+ this.forceClientAuth = forceClientAuth;
}
/// <summary>
@@ -100,7 +107,7 @@ namespace Grpc.Core
/// using client root certificates.
/// </summary>
/// <param name="keyCertificatePairs">Key-certificates to use.</param>
- public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs) : this(keyCertificatePairs, null)
+ public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs) : this(keyCertificatePairs, null, false)
{
}
@@ -126,6 +133,17 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// If true, the authenticity of client check will be enforced.
+ /// </summary>
+ public bool ForceClientAuthentication
+ {
+ get
+ {
+ return this.forceClientAuth;
+ }
+ }
+
internal override ServerCredentialsSafeHandle ToNativeCredentials()
{
int count = keyCertificatePairs.Count;
@@ -136,7 +154,7 @@ namespace Grpc.Core
certChains[i] = keyCertificatePairs[i].CertificateChain;
keys[i] = keyCertificatePairs[i].PrivateKey;
}
- return ServerCredentialsSafeHandle.CreateSslCredentials(rootCertificates, certChains, keys);
+ return ServerCredentialsSafeHandle.CreateSslCredentials(rootCertificates, certChains, keys, forceClientAuth);
}
}
}
diff --git a/src/csharp/Grpc.Core/ServerPort.cs b/src/csharp/Grpc.Core/ServerPort.cs
new file mode 100644
index 0000000000..55e4bd0062
--- /dev/null
+++ b/src/csharp/Grpc.Core/ServerPort.cs
@@ -0,0 +1,120 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// A port exposed by a server.
+ /// </summary>
+ public class ServerPort
+ {
+ /// <summary>
+ /// Pass this value as port to have the server choose an unused listening port for you.
+ /// Ports added to a server will contain the bound port in their <see cref="BoundPort"/> property.
+ /// </summary>
+ public const int PickUnused = 0;
+
+ readonly string host;
+ readonly int port;
+ readonly ServerCredentials credentials;
+ readonly int boundPort;
+
+ /// <summary>
+ /// Creates a new port on which server should listen.
+ /// </summary>
+ /// <returns>The port on which server will be listening.</returns>
+ /// <param name="host">the host</param>
+ /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
+ /// <param name="credentials">credentials to use to secure this port.</param>
+ public ServerPort(string host, int port, ServerCredentials credentials)
+ {
+ this.host = Preconditions.CheckNotNull(host);
+ this.port = port;
+ this.credentials = Preconditions.CheckNotNull(credentials);
+ }
+
+ /// <summary>
+ /// Creates a port from an existing <c>ServerPort</c> instance and boundPort value.
+ /// </summary>
+ internal ServerPort(ServerPort serverPort, int boundPort)
+ {
+ this.host = serverPort.host;
+ this.port = serverPort.port;
+ this.credentials = serverPort.credentials;
+ this.boundPort = boundPort;
+ }
+
+ /// <value>The host.</value>
+ public string Host
+ {
+ get
+ {
+ return host;
+ }
+ }
+
+ /// <value>The port.</value>
+ public int Port
+ {
+ get
+ {
+ return port;
+ }
+ }
+
+ /// <value>The server credentials.</value>
+ public ServerCredentials Credentials
+ {
+ get
+ {
+ return credentials;
+ }
+ }
+
+ /// <value>
+ /// The port actually bound by the server. This is useful if you let server
+ /// pick port automatically. <see cref="PickUnused"/>
+ /// </value>
+ public int BoundPort
+ {
+ get
+ {
+ return boundPort;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
index b180186c12..a00d156e52 100644
--- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs
+++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
@@ -33,7 +33,7 @@
using System;
using System.Collections.Generic;
-using System.Collections.Immutable;
+using System.Collections.ObjectModel;
using Grpc.Core.Internal;
namespace Grpc.Core
@@ -43,14 +43,14 @@ namespace Grpc.Core
/// </summary>
public class ServerServiceDefinition
{
- readonly ImmutableDictionary<string, IServerCallHandler> callHandlers;
+ readonly ReadOnlyDictionary<string, IServerCallHandler> callHandlers;
- private ServerServiceDefinition(ImmutableDictionary<string, IServerCallHandler> callHandlers)
+ private ServerServiceDefinition(Dictionary<string, IServerCallHandler> callHandlers)
{
- this.callHandlers = callHandlers;
+ this.callHandlers = new ReadOnlyDictionary<string, IServerCallHandler>(callHandlers);
}
- internal ImmutableDictionary<string, IServerCallHandler> CallHandlers
+ internal IDictionary<string, IServerCallHandler> CallHandlers
{
get
{
@@ -115,7 +115,7 @@ namespace Grpc.Core
public ServerServiceDefinition Build()
{
- return new ServerServiceDefinition(callHandlers.ToImmutableDictionary());
+ return new ServerServiceDefinition(callHandlers);
}
}
}
diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs
index b5cb652945..d2a029fbb4 100644
--- a/src/csharp/Grpc.Core/Version.cs
+++ b/src/csharp/Grpc.Core/Version.cs
@@ -2,4 +2,4 @@ using System.Reflection;
using System.Runtime.CompilerServices;
// The current version of gRPC C#.
-[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")]
+[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")]
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 656a3d47bb..939372e237 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -8,6 +8,6 @@ namespace Grpc.Core
/// <summary>
/// Current version of gRPC
/// </summary>
- public const string CurrentVersion = "0.6.0";
+ public const string CurrentVersion = "0.6.1";
}
}
diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs
new file mode 100644
index 0000000000..7ef3189d76
--- /dev/null
+++ b/src/csharp/Grpc.Core/WriteOptions.cs
@@ -0,0 +1,82 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Flags for write operations.
+ /// </summary>
+ [Flags]
+ public enum WriteFlags
+ {
+ /// <summary>
+ /// Hint that the write may be buffered and need not go out on the wire immediately.
+ /// gRPC is free to buffer the message until the next non-buffered
+ /// write, or until write stream completion, but it need not buffer completely or at all.
+ /// </summary>
+ BufferHint = 0x1,
+
+ /// <summary>
+ /// Force compression to be disabled for a particular write.
+ /// </summary>
+ NoCompress = 0x2
+ }
+
+ /// <summary>
+ /// Options for write operations.
+ /// </summary>
+ public class WriteOptions
+ {
+ /// <summary>
+ /// Default write options.
+ /// </summary>
+ public static readonly WriteOptions Default = new WriteOptions();
+
+ private WriteFlags flags;
+
+ public WriteOptions(WriteFlags flags = default(WriteFlags))
+ {
+ this.flags = flags;
+ }
+
+ public WriteFlags Flags
+ {
+ get
+ {
+ return this.flags;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config
index 6cdcdf2656..9b12b9b096 100644
--- a/src/csharp/Grpc.Core/packages.config
+++ b/src/csharp/Grpc.Core/packages.config
@@ -3,5 +3,4 @@
<package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" />
<package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
- <package id="System.Collections.Immutable" version="1.1.36" targetFramework="net45" />
</packages> \ No newline at end of file