diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-15 11:36:02 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-15 11:36:02 -0700 |
commit | 18b4b8379259ffa73f8b5414a09bcb682d96b200 (patch) | |
tree | beabff9da156628f36f00504de6c24c9ef3a5393 | |
parent | 5e84be56dbec81fbd9f654943378aa15dba49c1e (diff) | |
parent | a3697b451bdfbfa653d42f4b250b5950dd4b61bb (diff) |
Merge pull request #19 from soltanmm/ct-you-complete-me
Fix upstream merge conflicts
34 files changed, 935 insertions, 401 deletions
diff --git a/doc/connectivity-semantics-and-api.md b/doc/connectivity-semantics-and-api.md new file mode 100644 index 0000000000..842ff8adc1 --- /dev/null +++ b/doc/connectivity-semantics-and-api.md @@ -0,0 +1,122 @@ +gRPC Connectivity Semantics and API +=================================== + +This document describes the connectivity semantics for gRPC channels and the +corresponding impact on RPCs. We then discuss an API. + +States of Connectivity +---------------------- + +gRPC Channels provide the abstraction over which clients can communicate with +servers.The client-side channel object can be constructed using little more +than a DNS name. Channels encapsulate a range of functionality including name +resolution, establishing a TCP connection (with retries and backoff) and TLS +handshakes. Channels can also handle errors on established connections and +reconnect, or in the case of HTTP/2 GO_AWAY, re-resolve the name and reconnect. + +To hide the details of all this activity from the user of the gRPC API (i.e., +application code) while exposing meaningful information about the state of a +channel, we use a state machine with four states, defined below: + +CONNECTING: The channel is trying to establish a connection and is waiting to +make progress on one of the steps involved in name resolution, TCP connection +establishment or TLS handshake. This is the initial state for all channels upon +creation. + +READY: The channel has successfully established a connection all the way +through TLS handshake (or equivalent) and all subsequent attempt to communicate +have succeeded (or are pending without any known failure ). + +TRANSIENT_FAILURE: There has been some transient failure (such as a TCP 3-way +handshake timing out or a socket error). Channels in this state will eventually +switch to the CONNECTING state and try to establish a connection again. Since +retries are done with exponential backoff, channels that fail to connect will +start out spending very little time in this state but as the attempts fail +repeatedly, the channel will spend increasingly large amounts of time in this +state. For many non-fatal failures (e.g., TCP connection attempts timing out +because the server is not yet available), the channel may be stuck in this +state for an indefinitely large amount of time. + +FATAL_FAILURE: There has been a fatal failure and the channel will never +attempt to establish a connection again. (e.g., a server presenting an invalid +TLS certificate) + +Channels that enter this state never leave this state. + +The following table lists the legal transitions from one state to another and +corresponding reasons. Empty cells denote disallowed transitions. + +<table style='border: 1px solid black'> + <tr> + <th>From/To</th> + <th>CONNECTING</th> + <th>READY</th> + <th>TRANSIENT_FAILURE</th> + <th>FATAL_FAILURE</th> + </tr> + <tr> + <th>CONNECTING</th> + <td>Incremental progress during connection establishment</td> + <td>All steps needed to establish a connection succeeded</td> + <td>Any failure in any of the steps needed to establish connection</td> + <td>Fatal failure encountered while attempting a connection.</td> + </tr> + <tr> + <th>READY</th> + <td></td> + <td>Incremental successful communication on established channel.</td> + <td>Any failure encountered while expecting successful communication on + established channel.</td> + <td></td> + </tr> + <tr> + <th>TRANSIENT_FAILURE</th> + <td>Wait time required to implement (exponential) backoff is over.</td> + <td></td> + <td></td> + <td></td> + </tr> + <tr> + <th>FATAL_FAILURE</th> + <td></td> + <td></td> + <td></td> + <td></td> + </tr> +</table> + + +Channel State API +----------------- + +All gRPC libraries will expose a channel-level API method to poll the current +state of a channel. In C++, this method is called GetCurrentState and returns +an enum for one of the four legal states. + +All libraries should also expose an API that enables the application (user of +the gRPC API) to be notified when the channel state changes. Since state +changes can be rapid and race with any such notification, the notification +should just inform the user that some state change has happened, leaving it to +the user to poll the channel for the current state. + +The synchronous version of this API is: + +```cpp +bool WaitForStateChange(gpr_timespec deadline, ChannelState source_state); +``` + +which returns true when the state changes to something other than the +source_state and false if the deadline expires. Asynchronous and futures based +APIs should have a corresponding method that allows the application to be +notified when the state of a channel changes. + +Note that a notification is delivered every time there is a transition from any +state to any *other* state. On the other hand the rules for legal state +transition, require a transition from CONNECTING to TRANSIENT_FAILURE and back +to CONNECTING for every recoverable failure, even if the corresponding +exponential backoff requires no wait before retry. The combined effect is that +the application may receive state change notifications that appear spurious. +e.g., an application waiting for state changes on a channel that is CONNECTING +may receive a state change notification but find the channel in the same +CONNECTING state on polling for current state because the channel may have +spent infinitesimally small amount of time in the TRANSIENT_FAILURE state. diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 8c7f272609..d9b7ca216b 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -221,7 +221,7 @@ typedef enum { GRPC_OP_SEND_INITIAL_METADATA = 0, /* Send a message: 0 or more of these operations can occur for each call */ GRPC_OP_SEND_MESSAGE, - /* Send a close from the server: one and only one instance MUST be sent from + /* Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped */ GRPC_OP_SEND_CLOSE_FROM_CLIENT, @@ -240,7 +240,7 @@ typedef enum { the status will indicate some failure. */ GRPC_OP_RECV_STATUS_ON_CLIENT, - /* Receive status on the server: one and only one must be made on the server + /* Receive close on the server: one and only one must be made on the server */ GRPC_OP_RECV_CLOSE_ON_SERVER } grpc_op_type; diff --git a/include/grpc/support/slice.h b/include/grpc/support/slice.h index 9026602f15..3ee103bfb9 100644 --- a/include/grpc/support/slice.h +++ b/include/grpc/support/slice.h @@ -110,8 +110,9 @@ gpr_slice gpr_slice_ref(gpr_slice s); /* Decrement the ref count of s. If the ref count of s reaches zero, all slices sharing the ref count are destroyed, and considered no longer initialized. If s is ultimately derived from a call to gpr_slice_new(start, - len, dest) where dest!=NULL , then (*dest)(start, len) is called. Requires - s initialized. */ + len, dest) where dest!=NULL , then (*dest)(start) is called, else if s is + ultimately derived from a call to gpr_slice_new_with_len(start, len, dest) + where dest!=NULL , then (*dest)(start, len). Requires s initialized. */ void gpr_slice_unref(gpr_slice s); /* Create a slice pointing at some data. Calls malloc to allocate a refcount diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b0d2b5d229..c00c85bb90 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -849,6 +849,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::Status $ns$$Service$::Service::$Method$(" "::grpc::ServerContext* context, " "const $Request$* request, $Response$* response) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) request;\n"); + printer->Print(" (void) response;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); @@ -859,6 +862,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::ServerContext* context, " "::grpc::ServerReader< $Request$>* reader, " "$Response$* response) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) reader;\n"); + printer->Print(" (void) response;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); @@ -869,6 +875,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::ServerContext* context, " "const $Request$* request, " "::grpc::ServerWriter< $Response$>* writer) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) request;\n"); + printer->Print(" (void) writer;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); @@ -879,6 +888,8 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::ServerContext* context, " "::grpc::ServerReaderWriter< $Response$, $Request$>* " "stream) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) stream;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c index 159c7e052c..f878e6aaa1 100644 --- a/src/core/support/log_win32.c +++ b/src/core/support/log_win32.c @@ -42,6 +42,7 @@ #include <grpc/support/log_win32.h> #include <grpc/support/log.h> #include <grpc/support/time.h> +#include <grpc/support/string_util.h> #include "src/core/support/string.h" #include "src/core/support/string_win32.h" @@ -106,7 +107,7 @@ char *gpr_format_message(DWORD messageid) { NULL, messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)(&tmessage), 0, NULL); - if (status == 0) return gpr_strdup("Unable to retreive error string"); + if (status == 0) return gpr_strdup("Unable to retrieve error string"); message = gpr_tchar_to_char(tmessage); LocalFree(tmessage); return message; diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 95497a3cc8..5215cc87b1 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -58,7 +58,7 @@ typedef enum grpc_stream_op_code { GRPC_OP_SLICE } grpc_stream_op_code; -/* Arguments for GRPC_OP_BEGIN */ +/* Arguments for GRPC_OP_BEGIN_MESSAGE */ typedef struct grpc_begin_message { /* How many bytes of data will this message contain */ gpr_uint32 length; @@ -126,10 +126,8 @@ typedef struct grpc_stream_op { } data; } grpc_stream_op; -/* A stream op buffer is a wrapper around stream operations that is dynamically - extendable. - TODO(ctiller): inline a few elements into the struct, to avoid common case - per-call allocations. */ +/** A stream op buffer is a wrapper around stream operations that is + * dynamically extendable. */ typedef struct grpc_stream_op_buffer { grpc_stream_op *ops; size_t nops; diff --git a/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs b/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs new file mode 100644 index 0000000000..df09857efe --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs @@ -0,0 +1,105 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class ChannelOptionsTest + { + [Test] + public void IntOption() + { + var option = new ChannelOption("somename", 1); + + Assert.AreEqual(ChannelOption.OptionType.Integer, option.Type); + Assert.AreEqual("somename", option.Name); + Assert.AreEqual(1, option.IntValue); + Assert.Throws(typeof(InvalidOperationException), () => { var s = option.StringValue; }); + } + + [Test] + public void StringOption() + { + var option = new ChannelOption("somename", "ABCDEF"); + + Assert.AreEqual(ChannelOption.OptionType.String, option.Type); + Assert.AreEqual("somename", option.Name); + Assert.AreEqual("ABCDEF", option.StringValue); + Assert.Throws(typeof(InvalidOperationException), () => { var s = option.IntValue; }); + } + + [Test] + public void ConstructorPreconditions() + { + Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, "abc"); }); + Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, 1); }); + Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption("abc", null); }); + } + + [Test] + public void CreateChannelArgsNull() + { + var channelArgs = ChannelOptions.CreateChannelArgs(null); + Assert.IsTrue(channelArgs.IsInvalid); + } + + [Test] + public void CreateChannelArgsEmpty() + { + var options = new List<ChannelOption>(); + var channelArgs = ChannelOptions.CreateChannelArgs(options); + channelArgs.Dispose(); + } + + [Test] + public void CreateChannelArgs() + { + var options = new List<ChannelOption> + { + new ChannelOption("ABC", "XYZ"), + new ChannelOption("somename", "IJKLM"), + new ChannelOption("intoption", 12345), + new ChannelOption("GHIJK", 12345), + }; + + var channelArgs = ChannelOptions.CreateChannelArgs(options); + channelArgs.Dispose(); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 029653967b..92e28b7d74 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -3,8 +3,6 @@ <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> - <ProductVersion>8.0.30703</ProductVersion> - <SchemaVersion>2.0</SchemaVersion> <ProjectGuid>{86EC5CB4-4EA2-40A2-8057-86542A0353BB}</ProjectGuid> <OutputType>Library</OutputType> <RootNamespace>Grpc.Core.Tests</RootNamespace> @@ -48,6 +46,8 @@ <Compile Include="Internal\MetadataArraySafeHandleTest.cs" /> <Compile Include="Internal\CompletionQueueSafeHandleTest.cs" /> <Compile Include="Internal\CompletionQueueEventTest.cs" /> + <Compile Include="Internal\ChannelArgsSafeHandleTest.cs" /> + <Compile Include="ChannelOptionsTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> @@ -63,4 +63,4 @@ <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> </ItemGroup> <ItemGroup /> -</Project>
\ No newline at end of file +</Project> diff --git a/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs new file mode 100644 index 0000000000..af0aaa5f01 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs @@ -0,0 +1,75 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class ChannelArgsSafeHandleTest + { + [Test] + public void CreateEmptyAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.Create(0); + channelArgs.Dispose(); + } + + [Test] + public void CreateNonEmptyAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.Create(5); + channelArgs.Dispose(); + } + + [Test] + public void CreateNullAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.CreateNull(); + channelArgs.Dispose(); + } + + [Test] + public void CreateFillAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.Create(3); + channelArgs.SetInteger(0, "somekey", 12345); + channelArgs.SetString(1, "somekey", "abcdefghijkl"); + channelArgs.SetString(2, "somekey", "XYZ"); + channelArgs.Dispose(); + } + } +} diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 44b610f65b..d6bfbb7bc4 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -29,6 +29,7 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -50,10 +51,10 @@ namespace Grpc.Core /// </summary> /// <param name="host">The DNS name of IP address of the host.</param> /// <param name="credentials">Optional credentials to create a secure channel.</param> - /// <param name="channelArgs">Optional channel arguments.</param> - public Channel(string host, Credentials credentials = null, ChannelArgs channelArgs = null) + /// <param name="options">Channel options.</param> + public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null) { - using (ChannelArgsSafeHandle nativeChannelArgs = CreateNativeChannelArgs(channelArgs)) + using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options)) { if (credentials != null) { @@ -67,7 +68,7 @@ namespace Grpc.Core this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs); } } - this.target = GetOverridenTarget(host, channelArgs); + this.target = GetOverridenTarget(host, options); } /// <summary> @@ -76,9 +77,9 @@ namespace Grpc.Core /// <param name="host">DNS name or IP address</param> /// <param name="port">the port</param> /// <param name="credentials">Optional credentials to create a secure channel.</param> - /// <param name="channelArgs">Optional channel arguments.</param> - public Channel(string host, int port, Credentials credentials = null, ChannelArgs channelArgs = null) : - this(string.Format("{0}:{1}", host, port), credentials, channelArgs) + /// <param name="options">Channel options.</param> + public Channel(string host, int port, Credentials credentials = null, IEnumerable<ChannelOption> options = null) : + this(string.Format("{0}:{1}", host, port), credentials, options) { } @@ -112,22 +113,25 @@ namespace Grpc.Core } } - private static string GetOverridenTarget(string target, ChannelArgs args) + /// <summary> + /// Look for SslTargetNameOverride option and return its value instead of originalTarget + /// if found. + /// </summary> + private static string GetOverridenTarget(string originalTarget, IEnumerable<ChannelOption> options) { - if (args != null && !string.IsNullOrEmpty(args.GetSslTargetNameOverride())) + if (options == null) { - return args.GetSslTargetNameOverride(); + return originalTarget; } - return target; - } - - private static ChannelArgsSafeHandle CreateNativeChannelArgs(ChannelArgs args) - { - if (args == null) + foreach (var option in options) { - return ChannelArgsSafeHandle.CreateNull(); + if (option.Type == ChannelOption.OptionType.String + && option.Name == ChannelOptions.SslTargetNameOverride) + { + return option.StringValue; + } } - return args.ToNativeChannelArgs(); + return originalTarget; } } } diff --git a/src/csharp/Grpc.Core/ChannelArgs.cs b/src/csharp/Grpc.Core/ChannelArgs.cs deleted file mode 100644 index 74ab310e44..0000000000 --- a/src/csharp/Grpc.Core/ChannelArgs.cs +++ /dev/null @@ -1,115 +0,0 @@ -#region Copyright notice and license -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#endregion -using System; -using System.Collections.Generic; -using System.Collections.Immutable; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; -using Grpc.Core.Internal; - -namespace Grpc.Core -{ - /// <summary> - /// gRPC channel options. - /// </summary> - public class ChannelArgs - { - public const string SslTargetNameOverrideKey = "grpc.ssl_target_name_override"; - - readonly ImmutableDictionary<string, string> stringArgs; - - private ChannelArgs(ImmutableDictionary<string, string> stringArgs) - { - this.stringArgs = stringArgs; - } - - public string GetSslTargetNameOverride() - { - string result; - if (stringArgs.TryGetValue(SslTargetNameOverrideKey, out result)) - { - return result; - } - return null; - } - - public static Builder CreateBuilder() - { - return new Builder(); - } - - public class Builder - { - readonly Dictionary<string, string> stringArgs = new Dictionary<string, string>(); - - // TODO: AddInteger not supported yet. - public Builder AddString(string key, string value) - { - stringArgs.Add(key, value); - return this; - } - - public ChannelArgs Build() - { - return new ChannelArgs(stringArgs.ToImmutableDictionary()); - } - } - - /// <summary> - /// Creates native object for the channel arguments. - /// </summary> - /// <returns>The native channel arguments.</returns> - internal ChannelArgsSafeHandle ToNativeChannelArgs() - { - ChannelArgsSafeHandle nativeArgs = null; - try - { - nativeArgs = ChannelArgsSafeHandle.Create(stringArgs.Count); - int i = 0; - foreach (var entry in stringArgs) - { - nativeArgs.SetString(i, entry.Key, entry.Value); - i++; - } - return nativeArgs; - } - catch (Exception) - { - if (nativeArgs != null) - { - nativeArgs.Dispose(); - } - throw; - } - } - } -} diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs new file mode 100644 index 0000000000..bc23bb59b1 --- /dev/null +++ b/src/csharp/Grpc.Core/ChannelOptions.cs @@ -0,0 +1,178 @@ +#region Copyright notice and license +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#endregion +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// Channel option specified when creating a channel. + /// Corresponds to grpc_channel_args from grpc/grpc.h. + /// </summary> + public sealed class ChannelOption + { + public enum OptionType + { + Integer, + String + } + + private readonly OptionType type; + private readonly string name; + private readonly int intValue; + private readonly string stringValue; + + /// <summary> + /// Creates a channel option with a string value. + /// </summary> + /// <param name="name">Name.</param> + /// <param name="stringValue">String value.</param> + public ChannelOption(string name, string stringValue) + { + this.type = OptionType.String; + this.name = Preconditions.CheckNotNull(name); + this.stringValue = Preconditions.CheckNotNull(stringValue); + } + + /// <summary> + /// Creates a channel option with an integer value. + /// </summary> + /// <param name="name">Name.</param> + /// <param name="stringValue">String value.</param> + public ChannelOption(string name, int intValue) + { + this.type = OptionType.Integer; + this.name = Preconditions.CheckNotNull(name); + this.intValue = intValue; + } + + public OptionType Type + { + get + { + return type; + } + } + + public string Name + { + get + { + return name; + } + } + + public int IntValue + { + get + { + Preconditions.CheckState(type == OptionType.Integer); + return intValue; + } + } + + public string StringValue + { + get + { + Preconditions.CheckState(type == OptionType.String); + return stringValue; + } + } + } + + public static class ChannelOptions + { + // Override SSL target check. Only to be used for testing. + public const string SslTargetNameOverride = "grpc.ssl_target_name_override"; + + // Enable census for tracing and stats collection + public const string Census = "grpc.census"; + + // Maximum number of concurrent incoming streams to allow on a http2 connection + public const string MaxConcurrentStreams = "grpc.max_concurrent_streams"; + + // Maximum message length that the channel can receive + public const string MaxMessageLength = "grpc.max_message_length"; + + // Initial sequence number for http2 transports + public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number"; + + /// <summary> + /// Creates native object for a collection of channel options. + /// </summary> + /// <returns>The native channel arguments.</returns> + internal static ChannelArgsSafeHandle CreateChannelArgs(IEnumerable<ChannelOption> options) + { + if (options == null) + { + return ChannelArgsSafeHandle.CreateNull(); + } + var optionList = new List<ChannelOption>(options); // It's better to do defensive copy + ChannelArgsSafeHandle nativeArgs = null; + try + { + nativeArgs = ChannelArgsSafeHandle.Create(optionList.Count); + for (int i = 0; i < optionList.Count; i++) + { + var option = optionList[i]; + if (option.Type == ChannelOption.OptionType.Integer) + { + nativeArgs.SetInteger(i, option.Name, option.IntValue); + } + else if (option.Type == ChannelOption.OptionType.String) + { + nativeArgs.SetString(i, option.Name, option.StringValue); + } + else + { + throw new InvalidOperationException("Unknown option type"); + } + } + return nativeArgs; + } + catch (Exception) + { + if (nativeArgs != null) + { + nativeArgs.Dispose(); + } + throw; + } + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 5c7b9a8bb6..a36a6a5acc 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -5,8 +5,6 @@ <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> - <ProductVersion>8.0.30703</ProductVersion> - <SchemaVersion>2.0</SchemaVersion> <ProjectGuid>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</ProjectGuid> <OutputType>Library</OutputType> <RootNamespace>Grpc.Core</RootNamespace> @@ -78,7 +76,6 @@ <Compile Include="Internal\CredentialsSafeHandle.cs" /> <Compile Include="Credentials.cs" /> <Compile Include="Internal\ChannelArgsSafeHandle.cs" /> - <Compile Include="ChannelArgs.cs" /> <Compile Include="Internal\AsyncCompletion.cs" /> <Compile Include="Internal\AsyncCallBase.cs" /> <Compile Include="Internal\AsyncCallServer.cs" /> @@ -103,6 +100,7 @@ <Compile Include="Internal\CompletionQueueEvent.cs" /> <Compile Include="Internal\CompletionRegistry.cs" /> <Compile Include="Internal\BatchContextSafeHandle.cs" /> + <Compile Include="ChannelOptions.cs" /> </ItemGroup> <ItemGroup> <None Include="packages.config" /> @@ -132,4 +130,4 @@ </Target> <Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" /> <Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" /> -</Project>
\ No newline at end of file +</Project> diff --git a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs index c69f1a0d02..c12aec5a3a 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs @@ -45,6 +45,9 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)] static extern void grpcsharp_channel_args_set_string(ChannelArgsSafeHandle args, UIntPtr index, string key, string value); + [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)] + static extern void grpcsharp_channel_args_set_integer(ChannelArgsSafeHandle args, UIntPtr index, string key, int value); + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_channel_args_destroy(IntPtr args); @@ -67,6 +70,11 @@ namespace Grpc.Core.Internal grpcsharp_channel_args_set_string(this, new UIntPtr((uint)index), key, value); } + public void SetInteger(int index, string key, int value) + { + grpcsharp_channel_args_set_integer(this, new UIntPtr((uint)index), key, value); + } + protected override bool ReleaseHandle() { grpcsharp_channel_args_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index eda9afcb87..83dbb910aa 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -45,7 +45,7 @@ namespace Grpc.Core.Internal internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { [DllImport("grpc_csharp_ext.dll")] - static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); [DllImport("grpc_csharp_ext.dll")] static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); @@ -72,7 +72,7 @@ namespace Grpc.Core.Internal { } - public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args) + public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) { return grpcsharp_server_create(cq, args); } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 3352fc93a1..8e818885d1 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -61,9 +61,16 @@ namespace Grpc.Core bool startRequested; bool shutdownRequested; - public Server() + /// <summary> + /// Create a new server. + /// </summary> + /// <param name="options">Channel options.</param> + public Server(IEnumerable<ChannelOption> options = null) { - this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); + using (var channelArgs = ChannelOptions.CreateChannelArgs(options)) + { + this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs); + } } /// <summary> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 66171fae57..f0be522bc6 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -110,14 +110,16 @@ namespace Grpc.IntegrationTesting credentials = TestCredentials.CreateTestClientCredentials(options.useTestCa); } - ChannelArgs channelArgs = null; + List<ChannelOption> channelOptions = null; if (!string.IsNullOrEmpty(options.serverHostOverride)) { - channelArgs = ChannelArgs.CreateBuilder() - .AddString(ChannelArgs.SslTargetNameOverrideKey, options.serverHostOverride).Build(); + channelOptions = new List<ChannelOption> + { + new ChannelOption(ChannelOptions.SslTargetNameOverride, options.serverHostOverride) + }; } - using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelArgs)) + using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions)) { var stubConfig = StubConfiguration.Default; if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds") diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs index f756dfbc40..1a733450c1 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -62,10 +62,11 @@ namespace Grpc.IntegrationTesting int port = server.AddListeningPort(host, Server.PickUnusedPort, TestCredentials.CreateTestServerCredentials()); server.Start(); - var channelArgs = ChannelArgs.CreateBuilder() - .AddString(ChannelArgs.SslTargetNameOverrideKey, TestCredentials.DefaultHostOverride).Build(); - - channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), channelArgs); + var options = new List<ChannelOption> + { + new ChannelOption(ChannelOptions.SslTargetNameOverride, TestCredentials.DefaultHostOverride) + }; + channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), options); client = TestService.NewStub(channel); } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index c2a0b729d4..f52633b9db 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -65,8 +65,6 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) { return bb; } -typedef void(GPR_CALLTYPE *callback_funcptr)(gpr_int32 success, void *batch_context); - /* * Helper to maintain lifetime of batch op inputs and store batch op outputs. */ @@ -355,6 +353,16 @@ grpcsharp_channel_args_set_string(grpc_channel_args *args, size_t index, } GPR_EXPORT void GPR_CALLTYPE +grpcsharp_channel_args_set_integer(grpc_channel_args *args, size_t index, + const char *key, int value) { + GPR_ASSERT(args); + GPR_ASSERT(index < args->num_args); + args->args[index].type = GRPC_ARG_INTEGER; + args->args[index].key = gpr_strdup(key); + args->args[index].value.integer = value; +} + +GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_args_destroy(grpc_channel_args *args) { size_t i; if (args) { @@ -722,10 +730,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) { gpr_set_log_function(grpcsharp_log_handler); } +typedef void(GPR_CALLTYPE *test_callback_funcptr)(gpr_int32 success); + /* For testing */ GPR_EXPORT void GPR_CALLTYPE -grpcsharp_test_callback(callback_funcptr callback) { - callback(1, NULL); +grpcsharp_test_callback(test_callback_funcptr callback) { + callback(1); } /* For testing */ diff --git a/src/php/README.md b/src/php/README.md index 40c79e0dd4..cb9b48aee3 100644 --- a/src/php/README.md +++ b/src/php/README.md @@ -7,51 +7,122 @@ This directory contains source code for PHP implementation of gRPC layered on sh Pre-Alpha : This gRPC PHP implementation is work-in-progress and is not expected to work yet. - -## LAYOUT - -Directory structure is as generated by the PHP utility -[ext_skel](http://php.net/manual/en/internals2.buildsys.skeleton.php) - ## ENVIRONMENT Install `php5` and `php5-dev`. -To run the tests, additionally install `php5-readline` and `phpunit`. +To run the tests, additionally install `phpunit`. Alternatively, build and install PHP 5.5 or later from source with standard configuration options. -To also download and install protoc and the PHP code generator. +## Build from Homebrew + +On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. Run the following command to +install gRPC. + +```sh +$ curl -fsSL https://goo.gl/getgrpc | bash -s php +``` + +This will download and run the [gRPC install script][] and compile the gRPC PHP extension. + +## Build from Source + +Clone this repository + +``` +$ git clone https://github.com/grpc/grpc.git +``` + +Build and install the Protocol Buffers compiler (protoc) + +``` +$ cd grpc +$ git pull --recurse-submodules && git submodule update --init --recursive +$ cd third_party/protobuf +$ ./autogen.sh +$ ./configure +$ make +$ make check +$ sudo make install +``` + +Build and install the gRPC C core + +```sh +$ cd grpc +$ make +$ sudo make install +``` + +Build the gRPC PHP extension -```bash -apt-get install -y procps -curl -sSL https://get.rvm.io | sudo bash -s stable --ruby -git clone git@github.com:google/protobuf.git -cd protobuf -./configure -make -make install -git clone git@github.com:murgatroid99/Protobuf-PHP.git -cd Protobuf-PHP -rake pear:package version=1.0 -pear install Protobuf-1.0.tgz +```sh +$ cd grpc/src/php/ext/grpc +$ phpize +$ ./configure +$ make +$ sudo make install ``` -## BUILDING +In your php.ini file, add the line `extension=grpc.so` to load the extension +at PHP startup. - 1. In ./ext/grpc, run the command `phpize` (distributed with PHP) - 2. Run `./ext/grpc/configure` - 3. In ./ext/grpc, run `make` and `sudo make install` - 4. In your php.ini file, add the line `extension=grpc.so` to load the - extension at PHP startup. +Install Composer -## PHPUnit +```sh +$ cd grpc/src/php +$ curl -sS https://getcomposer.org/installer | php +$ php composer.phar install +``` + +## Unit Tests + +Run unit tests + +```sh +$ cd grpc/src/php +$ ./bin/run_tests.sh +``` + +## Generated Code Tests + +Install `protoc-gen-php` + +```sh +$ cd grpc/src/php/vendor/datto/protobuf-php +$ gem install rake ronn +$ rake pear:package version=1.0 +$ sudo pear install Protobuf-1.0.tgz +``` + +Generate client stub code + +```sh +$ cd grpc/src/php +$ ./bin/generate_proto_php.sh +``` + +Run a local server serving the math services + + - Please see [Node][] on how to run an example server + +```sh +$ cd grpc/src/node +$ npm install +$ nodejs examples/math_server.js +``` + +Run the generated code tests + +```sh +$ cd grpc/src/php +$ ./bin/run_gen_code_test.sh +``` -This repo now has PHPUnit tests, which can by run by executing -`./bin/run_tests.sh` after building. +[homebrew]:http://brew.sh +[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation +[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install +[Node]:https://github.com/grpc/grpc/tree/master/src/node/examples -There is also a generated code test (`./bin/run_gen_code_test.sh`), which tests -the stub `./tests/generated_code/math.php` against a running localhost server -serving the math service. That stub is generated from -`./tests/generated_code/math.proto`. diff --git a/src/php/bin/run_gen_code_test.sh b/src/php/bin/run_gen_code_test.sh index 4882a2b846..1be2ed3f72 100755 --- a/src/php/bin/run_gen_code_test.sh +++ b/src/php/bin/run_gen_code_test.sh @@ -30,8 +30,8 @@ cd $(dirname $0) GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ - -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ + -d extension=grpc.so `which phpunit` -v --debug --strict \ ../tests/generated_code/GeneratedCodeTest.php GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ - -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ + -d extension=grpc.so `which phpunit` -v --debug --strict \ ../tests/generated_code/GeneratedCodeWithCallbackTest.php diff --git a/src/php/composer.json b/src/php/composer.json index ba7a1302f2..b0115bdadd 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -4,8 +4,15 @@ "version": "0.5.0", "homepage": "http://grpc.io", "license": "BSD-3-Clause", + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/stanley-cheung/Protobuf-PHP" + } + ], "require": { "php": ">=5.5.0", + "datto/protobuf-php": "dev-master", "google/auth": "dev-master" }, "autoload": { diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index 2d2352b199..6102aaf0a8 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -32,8 +32,6 @@ * */ require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php'); -require 'DrSlump/Protobuf.php'; -\DrSlump\Protobuf::autoload(); require 'math.php'; abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { /* These tests require that a server exporting the math service must be diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index bf8d0cd93c..9aee01cd4d 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -32,8 +32,6 @@ * */ require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php'); -require 'DrSlump/Protobuf.php'; -\DrSlump\Protobuf::autoload(); require 'empty.php'; require 'message_set.php'; require 'messages.php'; diff --git a/src/python/README.md b/src/python/README.md index 0bca457a33..2beb3a913a 100644 --- a/src/python/README.md +++ b/src/python/README.md @@ -20,6 +20,10 @@ $ curl -fsSL https://goo.gl/getgrpc | bash -s python ``` This will download and run the [gRPC install script][], then install the latest version of the gRPC Python package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python. +EXAMPLES +-------- +Please read our online documentation for a [Quick Start][] and a [detailed example][] + BUILDING FROM SOURCE --------------------- - Clone this repository @@ -58,3 +62,5 @@ $ ../../tools/distrib/python/submit.py [homebrew]:http://brew.sh [linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install +[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html +[detailed example]:http://www.grpc.io/docs/installation/python.html diff --git a/src/python/src/README.rst b/src/python/src/README.rst index bc1815febc..00bdecf56f 100644 --- a/src/python/src/README.rst +++ b/src/python/src/README.rst @@ -6,22 +6,18 @@ Package for GRPC Python. Dependencies ------------ -Ensure that you have installed GRPC core. - -On debian linux systems, install from our released deb package: +Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_. +Run the following command to install gRPC Python. :: - $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc_0.5.0_amd64.deb - $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc-dev_0.5.0_amd64.deb - $ sudo dpkg -i libgrpc_0.5.0_amd64.deb libgrpc-dev_0.5.0_amd64.deb - -Otherwise, install from source: + $ curl -fsSL https://goo.gl/getgrpc | bash -s python -:: +This will download and run the [gRPC install script][] to install grpc core. The script then uses pip to install this package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python. - git clone https://github.com/grpc/grpc.git - cd grpc - ./configure - make && make install +Otherwise, `install from source`_ +.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source +.. _homebrew: http://brew.sh +.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation +.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c index 6ad3955986..651ed3b0a2 100644 --- a/src/python/src/grpc/_adapter/_c/utility.c +++ b/src/python/src/grpc/_adapter/_c/utility.c @@ -40,7 +40,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/slice.h> #include <grpc/support/time.h> -#include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "grpc/_adapter/_c/types.h" @@ -158,9 +158,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) { return 0; } if (PyTuple_Size(op) != OP_TUPLE_SIZE) { - char buf[64]; - snprintf(buf, sizeof(buf), "expected tuple op of length %d", OP_TUPLE_SIZE); + char *buf; + gpr_asprintf(&buf, "expected tuple op of length %d", OP_TUPLE_SIZE); PyErr_SetString(PyExc_ValueError, buf); + gpr_free(buf); return 0; } type = PyInt_AsLong(PyTuple_GET_ITEM(op, TYPE_INDEX)); @@ -355,9 +356,14 @@ double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec) { return timespec.tv_sec + 1e-9*timespec.tv_nsec; } +/* Because C89 doesn't have a way to check for infinity... */ +static int pygrpc_isinf(double x) { + return x * 0 != 0; +} + gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds) { gpr_timespec result; - if isinf(seconds) { + if (pygrpc_isinf(seconds)) { result = seconds > 0.0 ? gpr_inf_future : gpr_inf_past; } else { result.tv_sec = (time_t)seconds; diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py index 367effdb1a..0e58d912b9 100644 --- a/test/compiler/python_plugin_test.py +++ b/test/compiler/python_plugin_test.py @@ -36,6 +36,7 @@ import shutil import subprocess import sys import tempfile +import threading import time import unittest @@ -49,13 +50,13 @@ STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' -# Timeouts and delays. -SHORT_TIMEOUT = 0.1 -NORMAL_TIMEOUT = 1 -LONG_TIMEOUT = 2 -DOES_NOT_MATTER_DELAY = 0 +# The timeout used in tests of RPCs that are supposed to expire. +SHORT_TIMEOUT = 2 +# The timeout used in tests of RPCs that are not supposed to expire. The +# absurdly large value doesn't matter since no passing execution of this test +# module will ever wait the duration. +LONG_TIMEOUT = 600 NO_DELAY = 0 -LONG_DELAY = 1 # Build mode environment variable set by tools/run_tests/run_tests.py. _build_mode = os.environ['CONFIG'] @@ -64,47 +65,54 @@ _build_mode = os.environ['CONFIG'] class _ServicerMethods(object): def __init__(self, test_pb2, delay): + self._condition = threading.Condition() + self._delay = delay self._paused = False - self._failed = False - self.test_pb2 = test_pb2 - self.delay = delay + self._fail = False + self._test_pb2 = test_pb2 @contextlib.contextmanager def pause(self): # pylint: disable=invalid-name - self._paused = True + with self._condition: + self._paused = True yield - self._paused = False + with self._condition: + self._paused = False + self._condition.notify_all() @contextlib.contextmanager def fail(self): # pylint: disable=invalid-name - self._failed = True + with self._condition: + self._fail = True yield - self._failed = False + with self._condition: + self._fail = False def _control(self): # pylint: disable=invalid-name - if self._failed: - raise ValueError() - time.sleep(self.delay) - while self._paused: - time.sleep(0) - - def UnaryCall(self, request, unused_context): - response = self.test_pb2.SimpleResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() + time.sleep(self._delay) + + def UnaryCall(self, request, unused_rpc_context): + response = self._test_pb2.SimpleResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * request.response_size self._control() return response - def StreamingOutputCall(self, request, unused_context): + def StreamingOutputCall(self, request, unused_rpc_context): for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response - def StreamingInputCall(self, request_iter, unused_context): - response = self.test_pb2.StreamingInputCallResponse() + def StreamingInputCall(self, request_iter, unused_rpc_context): + response = self._test_pb2.StreamingInputCallResponse() aggregated_payload_size = 0 for request in request_iter: aggregated_payload_size += len(request.payload.payload_compressable) @@ -112,21 +120,21 @@ class _ServicerMethods(object): self._control() return response - def FullDuplexCall(self, request_iter, unused_context): + def FullDuplexCall(self, request_iter, unused_rpc_context): for request in request_iter: for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response - def HalfDuplexCall(self, request_iter, unused_context): + def HalfDuplexCall(self, request_iter, unused_rpc_context): responses = [] for request in request_iter: for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() responses.append(response) @@ -147,12 +155,11 @@ def _CreateService(test_pb2, delay): waiting for the service. Args: - test_pb2: the test_pb2 module generated by this test - delay: delay in seconds per response from the servicer - timeout: how long the stub will wait for the servicer by default. + test_pb2: The test_pb2 module generated by this test. + delay: Delay in seconds per response from the servicer. Yields: - A three-tuple (servicer_methods, servicer, stub), where the servicer is + A (servicer_methods, servicer, stub) three-tuple where servicer_methods is the back-end of the service bound to the stub and the server and stub are both activated and ready for use. """ @@ -185,7 +192,7 @@ def _CreateService(test_pb2, delay): yield servicer_methods, stub, server -def StreamingInputRequest(test_pb2): +def _streaming_input_request_iterator(test_pb2): for _ in range(3): request = test_pb2.StreamingInputCallRequest() request.payload.payload_type = test_pb2.COMPRESSABLE @@ -193,7 +200,7 @@ def StreamingInputRequest(test_pb2): yield request -def StreamingOutputRequest(test_pb2): +def _streaming_output_request(test_pb2): request = test_pb2.StreamingOutputCallRequest() sizes = [1, 2, 3] request.response_parameters.add(size=sizes[0], interval_us=0) @@ -202,7 +209,7 @@ def StreamingOutputRequest(test_pb2): return request -def FullDuplexRequest(test_pb2): +def _full_duplex_request_iterator(test_pb2): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request @@ -250,7 +257,7 @@ class PythonPluginTest(unittest.TestCase): if exc.errno != errno.ENOENT: raise - # TODO(atash): Figure out which of theses tests is hanging flakily with small + # TODO(atash): Figure out which of these tests is hanging flakily with small # probability. def testImportAttributes(self): @@ -265,37 +272,36 @@ class PythonPluginTest(unittest.TestCase): def testUpDown(self): import test_pb2 with _CreateService( - test_pb2, DOES_NOT_MATTER_DELAY) as (servicer, stub, unused_server): + test_pb2, NO_DELAY) as (servicer, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) def testUnaryCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. request = test_pb2.SimpleRequest(response_size=13) - response = stub.UnaryCall(request, NORMAL_TIMEOUT) - expected_response = servicer.UnaryCall(request, None) + response = stub.UnaryCall(request, timeout) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, LONG_DELAY) as ( - servicer, stub, unused_server): - start_time = time.clock() - response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) - # Check that we didn't block on the asynchronous call. - self.assertGreater(LONG_DELAY, time.clock() - start_time) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + # Check that the call does not block waiting for the server to respond. + with methods.pause(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) response = response_future.result() - expected_response = servicer.UnaryCall(request, None) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) - with servicer.pause(): + with methods.pause(): response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): response_future.result() @@ -305,9 +311,9 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): response_future = stub.UnaryCall.async(request, 1) response_future.cancel() self.assertTrue(response_future.cancelled()) @@ -315,30 +321,31 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): - response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testStreamingOutputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT) - expected_responses = servicer.StreamingOutputCall(request, None) - for check in itertools.izip_longest(expected_responses, responses): - expected_response, response = check + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) + expected_responses = methods.StreamingOutputCall( + request, 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' 'forever and fix.') def testStreamingOutputCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(responses) @@ -347,9 +354,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testStreamingOutputCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - unused_servicer, stub, unused_server): + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + unused_methods, stub, unused_server): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) next(responses) responses.cancel() @@ -360,10 +367,10 @@ class PythonPluginTest(unittest.TestCase): 'instead of raising the proper error.') def testStreamingOutputCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): responses = stub.StreamingOutputCall(request, 1) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): @@ -373,34 +380,32 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testStreamingInputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - response = stub.StreamingInputCall(StreamingInputRequest(test_pb2), - NORMAL_TIMEOUT) - expected_response = servicer.StreamingInputCall( - StreamingInputRequest(test_pb2), None) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + response = stub.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, LONG_DELAY) as ( - servicer, stub, unused_server): - start_time = time.clock() - response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), LONG_TIMEOUT) - self.assertGreater(LONG_DELAY, time.clock() - start_time) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) response = response_future.result() - expected_response = servicer.StreamingInputCall( - StreamingInputRequest(test_pb2), None) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), SHORT_TIMEOUT) + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): response_future.result() self.assertIsInstance( @@ -408,11 +413,12 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), NORMAL_TIMEOUT) + _streaming_input_request_iterator(test_pb2), timeout) response_future.cancel() self.assertTrue(response_future.cancelled()) with self.assertRaises(future.CancelledError): @@ -420,33 +426,33 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), SHORT_TIMEOUT) + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testFullDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2), - NORMAL_TIMEOUT) - expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2), - None) - for check in itertools.izip_longest(expected_responses, responses): - expected_response, response = check + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) + expected_responses = methods.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' 'forever and fix.') def testFullDuplexCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = FullDuplexRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): - responses = stub.FullDuplexCall(request, SHORT_TIMEOUT) + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(responses) @@ -454,9 +460,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testFullDuplexCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - request = FullDuplexRequest(test_pb2) - responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + request_iterator = _full_duplex_request_iterator(test_pb2) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) next(responses) responses.cancel() with self.assertRaises(future.CancelledError): @@ -466,11 +472,11 @@ class PythonPluginTest(unittest.TestCase): 'and fix.') def testFullDuplexCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = FullDuplexRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): - responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT) + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): next(responses) @@ -479,9 +485,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testHalfDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - def HalfDuplexRequest(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request @@ -489,30 +495,38 @@ class PythonPluginTest(unittest.TestCase): request.response_parameters.add(size=2, interval_us=0) request.response_parameters.add(size=3, interval_us=0) yield request - responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT) - expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest(), None) + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), LONG_TIMEOUT) + expected_responses = methods.HalfDuplexCall( + half_duplex_request_iterator(), 'not a real RpcContext!') for check in itertools.izip_longest(expected_responses, responses): expected_response, response = check self.assertEqual(expected_response, response) def testHalfDuplexCallWedged(self): import test_pb2 # pylint: disable=g-import-not-at-top - wait_flag = [False] + condition = threading.Condition() + wait_cell = [False] @contextlib.contextmanager def wait(): # pylint: disable=invalid-name # Where's Python 3's 'nonlocal' statement when you need it? - wait_flag[0] = True + with condition: + wait_cell[0] = True yield - wait_flag[0] = False - def HalfDuplexRequest(): + with condition: + wait_cell[0] = False + condition.notify_all() + def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request - while wait_flag[0]: - time.sleep(0.1) - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + with condition: + while wait_cell[0]: + condition.wait() + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): with wait(): - responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT) + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), SHORT_TIMEOUT) # half-duplex waits for the client to send all info with self.assertRaises(exceptions.ExpirationError): next(responses) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 921836e201..1b7a8d26b2 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -62,7 +62,7 @@ typedef std::list<grpc_time> deadline_list; class ClientRpcContext { public: - ClientRpcContext(int ch) : channel_id_(ch) {} + explicit ClientRpcContext(int ch) : channel_id_(ch) {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4b0678bb2c..210aef4fd6 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server { gpr_free(server_address); builder.RegisterAsyncService(&async_service_); - srv_cq_ = builder.AddCompletionQueue(); + for (int i = 0; i < config.threads(); i++) { + srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue())); + } server_ = builder.BuildAndStart(); using namespace std::placeholders; - request_unary_ = - std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, - _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4); - request_streaming_ = - std::bind(&TestService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3); - for (int i = 0; i < 100; i++) { - contexts_.push_front( - new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - request_unary_, ProcessRPC)); - contexts_.push_front( - new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - request_streaming_, ProcessRPC)); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < config.threads(); j++) { + auto request_unary = std::bind( + &TestService::AsyncService::RequestUnaryCall, &async_service_, _1, + _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + auto request_streaming = std::bind( + &TestService::AsyncService::RequestStreamingCall, &async_service_, + _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front( + new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + request_unary, ProcessRPC)); + contexts_.push_front( + new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + request_streaming, ProcessRPC)); + } } for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cq_->Next(&got_tag, &ok)) { + while (srv_cqs_[i]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke bool still_going = ctx->RunNextState(ok); @@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } - srv_cq_->Shutdown(); - bool ok; - void *got_tag; - while (srv_cq_->Next(&got_tag, &ok)) - ; + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); + bool ok; + void *got_tag; + while ((*cq)->Next(&got_tag, &ok)) + ; + } while (!contexts_.empty()) { delete contexts_.front(); contexts_.pop_front(); @@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server { } std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; - std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_; + std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; TestService::AsyncService async_service_; - std::function<void(ServerContext *, SimpleRequest *, - grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> - request_unary_; - std::function<void( - ServerContext *, - grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)> - request_streaming_; std::forward_list<ServerRpcContext *> contexts_; std::mutex shutdown_mutex_; diff --git a/tools/jenkins/run_jenkins.sh b/tools/jenkins/run_jenkins.sh index a95819af60..57cec77424 100755 --- a/tools/jenkins/run_jenkins.sh +++ b/tools/jenkins/run_jenkins.sh @@ -41,13 +41,35 @@ if [ "$platform" == "linux" ] then echo "building $language on Linux" + if [ "$ghprbPullId" != "" ] + then + # if we are building a pull request, grab corresponding refs. + FETCH_PULL_REQUEST_CMD="&& git fetch $GIT_URL refs/pull/$ghprbPullId/merge refs/pull/$ghprbPullId/head" + fi + + # Make sure the CID file is gone. + rm -f docker.cid + # Run tests inside docker - docker run grpc/grpc_jenkins_slave bash -c -l "git clone --recursive $GIT_URL /var/local/git/grpc \ - && cd /var/local/git/grpc && git checkout -f $GIT_COMMIT \ + docker run --cidfile=docker.cid grpc/grpc_jenkins_slave bash -c -l "git clone --recursive $GIT_URL /var/local/git/grpc \ + && cd /var/local/git/grpc \ + $FETCH_PULL_REQUEST_CMD \ + && git checkout -f $GIT_COMMIT \ && git submodule update \ && nvm use 0.12 \ && rvm use ruby-2.1 \ - && tools/run_tests/run_tests.py -t -l $language" + && tools/run_tests/run_tests.py -t -l $language" || DOCKER_FAILED="true" + + DOCKER_CID=`cat docker.cid` + if [ "$DOCKER_FAILED" == "" ] + then + echo "Docker finished successfully, deleting the container $DOCKER_CID" + docker rm $DOCKER_CID + else + echo "Docker exited with failure, keeping container $DOCKER_CID." + echo "You can SSH to the worker and use 'docker start CID' and 'docker exec -i -t CID bash' to debug the problem." + fi + elif [ "$platform" == "windows" ] then echo "building $language on Windows" diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh index d0f09e4d8b..53db6af0ea 100755 --- a/tools/run_tests/build_python.sh +++ b/tools/run_tests/build_python.sh @@ -38,5 +38,5 @@ rm -rf python2.7_virtual_environment virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment source python2.7_virtual_environment/bin/activate pip install -r src/python/requirements.txt -CFLAGS=-I$root/include LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src +CFLAGS="-I$root/include -std=c89" LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src pip install src/python/interop diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py index 985b7a7f16..058a30d1ce 100755 --- a/tools/run_tests/jobset.py +++ b/tools/run_tests/jobset.py @@ -223,6 +223,7 @@ class Jobset(object): self._travis = travis self._cache = cache self._stop_on_failure = stop_on_failure + self._hashes = {} def start(self, spec): """Start a job. Return True on success, False on failure.""" @@ -231,11 +232,15 @@ class Jobset(object): self.reap() if self.cancelled(): return False if spec.hash_targets: - bin_hash = hashlib.sha1() - for fn in spec.hash_targets: - with open(which(fn)) as f: - bin_hash.update(f.read()) - bin_hash = bin_hash.hexdigest() + if spec.identity() in self._hashes: + bin_hash = self._hashes[spec.identity()] + else: + bin_hash = hashlib.sha1() + for fn in spec.hash_targets: + with open(which(fn)) as f: + bin_hash.update(f.read()) + bin_hash = bin_hash.hexdigest() + self._hashes[spec.identity()] = bin_hash should_run = self._cache.should_run(spec.identity(), bin_hash) else: bin_hash = None @@ -266,6 +271,7 @@ class Jobset(object): for job in self._running: job.kill() dead.add(job) + break for job in dead: self._completed += 1 self._running.remove(job) diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 32405675b6..ea40d7e990 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -313,7 +313,7 @@ _CONFIGS = { 'dbg': SimpleConfig('dbg'), 'opt': SimpleConfig('opt'), 'tsan': SimpleConfig('tsan', environ={ - 'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt'}), + 'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt:halt_on_error=1'}), 'msan': SimpleConfig('msan'), 'ubsan': SimpleConfig('ubsan'), 'asan': SimpleConfig('asan', environ={ @@ -449,6 +449,7 @@ class TestCache(object): def __init__(self, use_cache_results): self._last_successful_run = {} self._use_cache_results = use_cache_results + self._last_save = time.time() def should_run(self, cmdline, bin_hash): if cmdline not in self._last_successful_run: @@ -461,7 +462,8 @@ class TestCache(object): def finished(self, cmdline, bin_hash): self._last_successful_run[cmdline] = bin_hash - self.save() + if time.time() - self._last_save > 1: + self.save() def dump(self): return [{'cmdline': k, 'hash': v} @@ -473,6 +475,7 @@ class TestCache(object): def save(self): with open('.run_tests_cache', 'w') as f: f.write(json.dumps(self.dump())) + self._last_save = time.time() def maybe_load(self): if os.path.exists('.run_tests_cache'): @@ -515,6 +518,8 @@ def _build_and_run(check_cancelled, newline_on_success, travis, cache): for antagonist in antagonists: antagonist.kill() + if cache: cache.save() + return 0 |