aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/csharp/GrpcApiTests/MathClientServerTests.cs8
-rw-r--r--src/csharp/GrpcCore/Channel.cs7
-rw-r--r--src/csharp/GrpcCore/GrpcCore.csproj1
-rw-r--r--src/csharp/GrpcCore/GrpcEnvironment.cs67
-rw-r--r--src/csharp/GrpcCore/Server.cs4
-rw-r--r--src/csharp/GrpcCore/Utils/PortPicker.cs50
-rw-r--r--src/csharp/GrpcCore/Utils/RecordingQueue.cs1
-rw-r--r--src/csharp/GrpcCoreTests/ClientServerTest.cs8
-rw-r--r--src/csharp/GrpcCoreTests/GrpcEnvironmentTest.cs24
-rw-r--r--src/csharp/GrpcCoreTests/ServerTest.cs6
-rw-r--r--src/csharp/InteropClient/Client.cs2
-rw-r--r--src/csharp/MathClient/MathClient.cs2
-rw-r--r--test/core/fling/client.c85
-rw-r--r--test/core/fling/server.c192
-rwxr-xr-xtools/gce_setup/grpc_docker.sh4
15 files changed, 304 insertions, 157 deletions
diff --git a/src/csharp/GrpcApiTests/MathClientServerTests.cs b/src/csharp/GrpcApiTests/MathClientServerTests.cs
index f5c74573de..bb3f75d4ac 100644
--- a/src/csharp/GrpcApiTests/MathClientServerTests.cs
+++ b/src/csharp/GrpcApiTests/MathClientServerTests.cs
@@ -46,7 +46,7 @@ namespace math.Tests
/// </summary>
public class MathClientServerTest
{
- string serverAddr = "localhost:" + PortPicker.PickUnusedPort();
+ string host = "localhost";
Server server;
Channel channel;
MathGrpc.IMathServiceClient client;
@@ -54,11 +54,13 @@ namespace math.Tests
[TestFixtureSetUp]
public void Init()
{
+ GrpcEnvironment.Initialize();
+
server = new Server();
server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl()));
- server.AddPort(serverAddr);
+ int port = server.AddPort(host + ":0");
server.Start();
- channel = new Channel(serverAddr);
+ channel = new Channel(host + ":" + port);
client = MathGrpc.NewStub(channel);
}
diff --git a/src/csharp/GrpcCore/Channel.cs b/src/csharp/GrpcCore/Channel.cs
index 242e2b621a..cd4f151f49 100644
--- a/src/csharp/GrpcCore/Channel.cs
+++ b/src/csharp/GrpcCore/Channel.cs
@@ -41,13 +41,6 @@ namespace Google.GRPC.Core
{
public class Channel : IDisposable
{
- /// <summary>
- /// Make sure GPRC environment is initialized before any channels get used.
- /// </summary>
- static Channel() {
- GrpcEnvironment.EnsureInitialized();
- }
-
readonly ChannelSafeHandle handle;
readonly String target;
diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj
index 95df890917..34b9f6dfb8 100644
--- a/src/csharp/GrpcCore/GrpcCore.csproj
+++ b/src/csharp/GrpcCore/GrpcCore.csproj
@@ -61,7 +61,6 @@
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\RecordingObserver.cs" />
- <Compile Include="Utils\PortPicker.cs" />
<Compile Include="Utils\RecordingQueue.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
diff --git a/src/csharp/GrpcCore/GrpcEnvironment.cs b/src/csharp/GrpcCore/GrpcEnvironment.cs
index 2febb56d89..ee1168621d 100644
--- a/src/csharp/GrpcCore/GrpcEnvironment.cs
+++ b/src/csharp/GrpcCore/GrpcEnvironment.cs
@@ -38,11 +38,9 @@ using System.Runtime.InteropServices;
namespace Google.GRPC.Core
{
/// <summary>
- /// Encapsulates initialization and shutdown of GRPC C core library.
- /// You should not need to initialize it manually, as static constructors
- /// should load the library when needed.
+ /// Encapsulates initialization and shutdown of gRPC library.
/// </summary>
- public static class GrpcEnvironment
+ public class GrpcEnvironment
{
const int THREAD_POOL_SIZE = 1;
@@ -53,21 +51,24 @@ namespace Google.GRPC.Core
static extern void grpcsharp_shutdown();
static object staticLock = new object();
- static bool initCalled = false;
- static bool shutdownCalled = false;
-
- static GrpcThreadPool threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
+ static volatile GrpcEnvironment instance;
+
+ readonly GrpcThreadPool threadPool;
+ bool isClosed;
/// <summary>
- /// Makes sure GRPC environment is initialized.
+ /// Makes sure GRPC environment is initialized. Subsequent invocations don't have any
+ /// effect unless you call Shutdown first.
+ /// Although normal use cases assume you will call this just once in your application's
+ /// lifetime (and call Shutdown once you're done), for the sake of easier testing it's
+ /// allowed to initialize the environment again after it has been successfully shutdown.
/// </summary>
- public static void EnsureInitialized() {
+ public static void Initialize() {
lock(staticLock)
{
- if (!initCalled)
+ if (instance == null)
{
- initCalled = true;
- GrpcInit();
+ instance = new GrpcEnvironment();
}
}
}
@@ -80,45 +81,55 @@ namespace Google.GRPC.Core
{
lock(staticLock)
{
- if (initCalled && !shutdownCalled)
+ if (instance != null)
{
- shutdownCalled = true;
- GrpcShutdown();
+ instance.Close();
+ instance = null;
}
}
+ }
+ internal static GrpcThreadPool ThreadPool
+ {
+ get
+ {
+ var inst = instance;
+ if (inst == null)
+ {
+ throw new InvalidOperationException("GRPC environment not initialized");
+ }
+ return inst.threadPool;
+ }
}
/// <summary>
- /// Initializes GRPC C Core library.
+ /// Creates gRPC environment.
/// </summary>
- private static void GrpcInit()
+ private GrpcEnvironment()
{
grpcsharp_init();
+ threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
Console.WriteLine("GRPC initialized.");
}
/// <summary>
- /// Shutdown GRPC C Core library.
+ /// Shuts down this environment.
/// </summary>
- private static void GrpcShutdown()
+ private void Close()
{
+ if (isClosed)
+ {
+ throw new InvalidOperationException("Close has already been called");
+ }
threadPool.Stop();
grpcsharp_shutdown();
+ isClosed = true;
// TODO: use proper logging here
Console.WriteLine("GRPC shutdown.");
}
-
- internal static GrpcThreadPool ThreadPool
- {
- get
- {
- return threadPool;
- }
- }
}
}
diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs
index ef06c0a6a3..62ffa70b71 100644
--- a/src/csharp/GrpcCore/Server.cs
+++ b/src/csharp/GrpcCore/Server.cs
@@ -59,10 +59,6 @@ namespace Google.GRPC.Core
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
- static Server() {
- GrpcEnvironment.EnsureInitialized();
- }
-
public Server()
{
// TODO: what is the tag for server shutdown?
diff --git a/src/csharp/GrpcCore/Utils/PortPicker.cs b/src/csharp/GrpcCore/Utils/PortPicker.cs
deleted file mode 100644
index 7c83bf3886..0000000000
--- a/src/csharp/GrpcCore/Utils/PortPicker.cs
+++ /dev/null
@@ -1,50 +0,0 @@
-using System;
-using System.Net;
-using System.Net.Sockets;
-
-namespace Google.GRPC.Core.Utils
-{
- public class PortPicker
- {
- static Random random = new Random();
-
- // TODO: cleanup this code a bit
- public static int PickUnusedPort()
- {
- int port;
- do
- {
- port = random.Next(2000, 50000);
-
- } while(!IsPortAvailable(port));
- return port;
- }
-
- // TODO: cleanup this code a bit
- public static bool IsPortAvailable(int port)
- {
- bool available = true;
-
- TcpListener server = null;
- try
- {
- IPAddress ipAddress = Dns.GetHostEntry("localhost").AddressList[0];
- server = new TcpListener(ipAddress, port);
- server.Start();
- }
- catch (Exception ex)
- {
- available = false;
- }
- finally
- {
- if (server != null)
- {
- server.Stop();
- }
- }
- return available;
- }
- }
-}
-
diff --git a/src/csharp/GrpcCore/Utils/RecordingQueue.cs b/src/csharp/GrpcCore/Utils/RecordingQueue.cs
index 5a91852c8c..d73fc0fc78 100644
--- a/src/csharp/GrpcCore/Utils/RecordingQueue.cs
+++ b/src/csharp/GrpcCore/Utils/RecordingQueue.cs
@@ -38,6 +38,7 @@ using System.Collections.Concurrent;
namespace Google.GRPC.Core.Utils
{
+ // TODO: replace this by something that implements IAsyncEnumerator.
/// <summary>
/// Observer that allows us to await incoming messages one-by-one.
/// The implementation is not ideal and class will be probably replaced
diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs
index 1cfb6da0fa..1472db6e07 100644
--- a/src/csharp/GrpcCoreTests/ClientServerTest.cs
+++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs
@@ -43,7 +43,7 @@ namespace Google.GRPC.Core.Tests
{
public class ClientServerTest
{
- string serverAddr = "localhost:" + PortPicker.PickUnusedPort();
+ string host = "localhost";
Method<string, string> unaryEchoStringMethod = new Method<string, string>(
MethodType.Unary,
@@ -54,15 +54,17 @@ namespace Google.GRPC.Core.Tests
[Test]
public void EmptyCall()
{
+ GrpcEnvironment.Initialize();
+
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
- server.AddPort(serverAddr);
+ int port = server.AddPort(host + ":0");
server.Start();
- using (Channel channel = new Channel(serverAddr))
+ using (Channel channel = new Channel(host + ":" + port))
{
var call = new Call<string, string>(unaryEchoStringMethod, channel);
diff --git a/src/csharp/GrpcCoreTests/GrpcEnvironmentTest.cs b/src/csharp/GrpcCoreTests/GrpcEnvironmentTest.cs
index 781d1fc109..1bc6cce401 100644
--- a/src/csharp/GrpcCoreTests/GrpcEnvironmentTest.cs
+++ b/src/csharp/GrpcCoreTests/GrpcEnvironmentTest.cs
@@ -42,10 +42,30 @@ namespace Google.GRPC.Core.Tests
{
[Test]
public void InitializeAndShutdownGrpcEnvironment() {
- GrpcEnvironment.EnsureInitialized();
- Thread.Sleep(500);
+ GrpcEnvironment.Initialize();
Assert.IsNotNull(GrpcEnvironment.ThreadPool.CompletionQueue);
GrpcEnvironment.Shutdown();
}
+
+ [Test]
+ public void SubsequentInvocations() {
+ GrpcEnvironment.Initialize();
+ GrpcEnvironment.Initialize();
+ GrpcEnvironment.Shutdown();
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void InitializeAfterShutdown() {
+ GrpcEnvironment.Initialize();
+ var tp1 = GrpcEnvironment.ThreadPool;
+ GrpcEnvironment.Shutdown();
+
+ GrpcEnvironment.Initialize();
+ var tp2 = GrpcEnvironment.ThreadPool;
+ GrpcEnvironment.Shutdown();
+
+ Assert.IsFalse(Object.ReferenceEquals(tp1, tp2));
+ }
}
}
diff --git a/src/csharp/GrpcCoreTests/ServerTest.cs b/src/csharp/GrpcCoreTests/ServerTest.cs
index d5fd3aab46..1c70a3d6c4 100644
--- a/src/csharp/GrpcCoreTests/ServerTest.cs
+++ b/src/csharp/GrpcCoreTests/ServerTest.cs
@@ -42,10 +42,12 @@ namespace Google.GRPC.Core.Tests
public class ServerTest
{
[Test]
- public void StartAndShutdownServer() {
+ public void StartAndShutdownServer()
+ {
+ GrpcEnvironment.Initialize();
Server server = new Server();
- server.AddPort("localhost:" + PortPicker.PickUnusedPort());
+ int port = server.AddPort("localhost:0");
server.Start();
server.ShutdownAsync().Wait();
diff --git a/src/csharp/InteropClient/Client.cs b/src/csharp/InteropClient/Client.cs
index 86845cd76e..fcc6a572e4 100644
--- a/src/csharp/InteropClient/Client.cs
+++ b/src/csharp/InteropClient/Client.cs
@@ -93,6 +93,8 @@ namespace Google.GRPC.Interop
private void Run()
{
+ GrpcEnvironment.Initialize();
+
string addr = string.Format("{0}:{1}", options.serverHost, options.serverPort);
using (Channel channel = new Channel(addr))
{
diff --git a/src/csharp/MathClient/MathClient.cs b/src/csharp/MathClient/MathClient.cs
index a740c0ac49..a54c8e3809 100644
--- a/src/csharp/MathClient/MathClient.cs
+++ b/src/csharp/MathClient/MathClient.cs
@@ -42,6 +42,8 @@ namespace math
{
public static void Main (string[] args)
{
+ GrpcEnvironment.Initialize();
+
using (Channel channel = new Channel("127.0.0.1:23456"))
{
MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel);
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index a91dfba9b0..6262699631 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -49,40 +49,77 @@ static grpc_byte_buffer *the_buffer;
static grpc_channel *channel;
static grpc_completion_queue *cq;
static grpc_call *call;
-
-static void init_ping_pong_request(void) {}
+static grpc_op ops[6];
+static grpc_op stream_init_op;
+static grpc_op stream_step_ops[2];
+static grpc_metadata_array initial_metadata_recv;
+static grpc_metadata_array trailing_metadata_recv;
+static grpc_byte_buffer *response_payload_recv = NULL;
+static grpc_call_details call_details;
+static grpc_status_code status;
+static char *details = NULL;
+static size_t details_capacity = 0;
+static grpc_op *op;
+
+static void init_ping_pong_request(void) {
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ op = ops;
+
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = the_buffer;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &response_payload_recv;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op++;
+}
static void step_ping_pong_request(void) {
- call = grpc_channel_create_call_old(channel, "/Reflector/reflectUnary",
- "localhost", gpr_inf_future);
- GPR_ASSERT(grpc_call_invoke_old(call, cq, (void *)1, (void *)1,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- GPR_ASSERT(grpc_call_start_write_old(call, the_buffer, (void *)1,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
- GPR_ASSERT(grpc_call_start_read_old(call, (void *)1) == GRPC_CALL_OK);
- GPR_ASSERT(grpc_call_writes_done_old(call, (void *)1) == GRPC_CALL_OK);
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
+ call = grpc_channel_create_call(channel, cq, "/Reflector/reflectUnary",
+ "localhost", gpr_inf_future);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call, ops, op - ops, (void *)1));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_call_destroy(call);
call = NULL;
}
static void init_ping_pong_stream(void) {
- call = grpc_channel_create_call_old(channel, "/Reflector/reflectStream",
- "localhost", gpr_inf_future);
- GPR_ASSERT(grpc_call_invoke_old(call, cq, (void *)1, (void *)1, 0) ==
- GRPC_CALL_OK);
+ call = grpc_channel_create_call(channel, cq, "/Reflector/reflectStream",
+ "localhost", gpr_inf_future);
+ stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA;
+ stream_init_op.data.send_initial_metadata.count = 0;
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call, &stream_init_op, 1, (void *)1));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+
+ stream_step_ops[0].op = GRPC_OP_SEND_MESSAGE;
+ stream_step_ops[0].data.send_message = the_buffer;
+ stream_step_ops[1].op = GRPC_OP_RECV_MESSAGE;
+ stream_step_ops[1].data.recv_message = &response_payload_recv;
}
static void step_ping_pong_stream(void) {
- GPR_ASSERT(grpc_call_start_write_old(call, the_buffer, (void *)1, 0) ==
- GRPC_CALL_OK);
- GPR_ASSERT(grpc_call_start_read_old(call, (void *)1) == GRPC_CALL_OK);
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call, stream_step_ops, 2, (void *)1));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
}
@@ -99,7 +136,8 @@ typedef struct {
static const scenario scenarios[] = {
{"ping-pong-request", init_ping_pong_request, step_ping_pong_request},
- {"ping-pong-stream", init_ping_pong_stream, step_ping_pong_stream}, };
+ {"ping-pong-stream", init_ping_pong_stream, step_ping_pong_stream},
+};
int main(int argc, char **argv) {
gpr_slice slice = gpr_slice_from_copied_string("x");
@@ -148,6 +186,7 @@ int main(int argc, char **argv) {
cq = grpc_completion_queue_create();
the_buffer = grpc_byte_buffer_create(&slice, payload_size);
histogram = gpr_histogram_create(0.01, 60e9);
+
sc.init();
for (i = 0; i < 1000; i++) {
diff --git a/test/core/fling/server.c b/test/core/fling/server.c
index ba5e96ddc0..bc52059b38 100644
--- a/test/core/fling/server.c
+++ b/test/core/fling/server.c
@@ -52,17 +52,118 @@
static grpc_completion_queue *cq;
static grpc_server *server;
+static grpc_call *call;
+static grpc_call_details call_details;
+static grpc_metadata_array request_metadata_recv;
+static grpc_metadata_array initial_metadata_send;
+static grpc_byte_buffer *payload_buffer = NULL;
+/* Used to drain the terminal read in unary calls. */
+static grpc_byte_buffer *terminal_buffer = NULL;
+
+static grpc_op read_op;
+static grpc_op metadata_send_op;
+static grpc_op write_op;
+static grpc_op status_op[2];
+static int was_cancelled = 2;
+static grpc_op unary_ops[6];
static int got_sigint = 0;
+static void *tag(gpr_intptr t) { return (void *)t; }
+
+typedef enum {
+ FLING_SERVER_NEW_REQUEST = 1,
+ FLING_SERVER_READ_FOR_UNARY,
+ FLING_SERVER_BATCH_OPS_FOR_UNARY,
+ FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING,
+ FLING_SERVER_READ_FOR_STREAMING,
+ FLING_SERVER_WRITE_FOR_STREAMING,
+ FLING_SERVER_SEND_STATUS_FOR_STREAMING
+} fling_server_tags;
+
typedef struct {
gpr_refcount pending_ops;
gpr_uint32 flags;
} call_state;
static void request_call(void) {
- call_state *s = gpr_malloc(sizeof(call_state));
- gpr_ref_init(&s->pending_ops, 2);
- grpc_server_request_call_old(server, s);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+ grpc_server_request_call(server, &call, &call_details, &request_metadata_recv,
+ cq, tag(FLING_SERVER_NEW_REQUEST));
+}
+
+static void handle_unary_method(void) {
+ grpc_op *op;
+
+ grpc_metadata_array_init(&initial_metadata_send);
+
+ op = unary_ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &terminal_buffer;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ if (payload_buffer == NULL) {
+ gpr_log(GPR_INFO, "NULL payload buffer !!!");
+ }
+ op->data.send_message = payload_buffer;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status_details = "";
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op++;
+
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call, unary_ops, op - unary_ops,
+ tag(FLING_SERVER_BATCH_OPS_FOR_UNARY)));
+}
+
+static void send_initial_metadata(void) {
+ grpc_metadata_array_init(&initial_metadata_send);
+ metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA;
+ metadata_send_op.data.send_initial_metadata.count = 0;
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(
+ call, &metadata_send_op, 1,
+ tag(FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING)));
+}
+
+static void start_read_op(int t) {
+ /* Starting read at server */
+ read_op.op = GRPC_OP_RECV_MESSAGE;
+ read_op.data.recv_message = &payload_buffer;
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &read_op, 1, tag(t)));
+}
+
+static void start_write_op(void) {
+ /* Starting write at server */
+ write_op.op = GRPC_OP_SEND_MESSAGE;
+ if (payload_buffer == NULL) {
+ gpr_log(GPR_INFO, "NULL payload buffer !!!");
+ }
+ write_op.data.send_message = payload_buffer;
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call, &write_op, 1,
+ tag(FLING_SERVER_WRITE_FOR_STREAMING)));
+}
+
+static void start_send_status(void) {
+ status_op[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ status_op[0].data.send_status_from_server.status = GRPC_STATUS_OK;
+ status_op[0].data.send_status_from_server.trailing_metadata_count = 0;
+ status_op[0].data.send_status_from_server.status_details = "";
+ status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ status_op[1].data.recv_close_on_server.cancelled = &was_cancelled;
+
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(
+ call, status_op, 2,
+ tag(FLING_SERVER_SEND_STATUS_FOR_STREAMING)));
}
static void sigint_handler(int x) { got_sigint = 1; }
@@ -133,43 +234,66 @@ int main(int argc, char **argv) {
if (!ev) continue;
s = ev->tag;
switch (ev->type) {
- case GRPC_SERVER_RPC_NEW:
- if (ev->call != NULL) {
- /* initial ops are already started in request_call */
- if (0 == strcmp(ev->data.server_rpc_new.method,
- "/Reflector/reflectStream")) {
- s->flags = 0;
- } else {
- s->flags = GRPC_WRITE_BUFFER_HINT;
- }
- grpc_call_server_accept_old(ev->call, cq, s);
- grpc_call_server_end_initial_metadata_old(ev->call, s->flags);
- GPR_ASSERT(grpc_call_start_read_old(ev->call, s) == GRPC_CALL_OK);
- request_call();
- } else {
- GPR_ASSERT(shutdown_started);
- gpr_free(s);
+ case GRPC_OP_COMPLETE:
+ switch ((gpr_intptr)s) {
+ case FLING_SERVER_NEW_REQUEST:
+ if (call != NULL) {
+ if (0 ==
+ strcmp(call_details.method, "/Reflector/reflectStream")) {
+ /* Received streaming call. Send metadata here. */
+ start_read_op(FLING_SERVER_READ_FOR_STREAMING);
+ send_initial_metadata();
+ } else {
+ /* Received unary call. Can do all ops in one batch. */
+ start_read_op(FLING_SERVER_READ_FOR_UNARY);
+ }
+ } else {
+ GPR_ASSERT(shutdown_started);
+ }
+ /* request_call();
+ */
+ break;
+ case FLING_SERVER_READ_FOR_STREAMING:
+ if (payload_buffer != NULL) {
+ /* Received payload from client. */
+ start_write_op();
+ } else {
+ /* Received end of stream from client. */
+ start_send_status();
+ }
+ break;
+ case FLING_SERVER_WRITE_FOR_STREAMING:
+ /* Write completed at server */
+ start_read_op(FLING_SERVER_READ_FOR_STREAMING);
+ break;
+ case FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING:
+ /* Metadata send completed at server */
+ break;
+ case FLING_SERVER_SEND_STATUS_FOR_STREAMING:
+ /* Send status and close completed at server */
+ grpc_call_destroy(call);
+ request_call();
+ break;
+ case FLING_SERVER_READ_FOR_UNARY:
+ /* Finished payload read for unary. Start all reamaining
+ * unary ops in a batch.
+ */
+ handle_unary_method();
+ break;
+ case FLING_SERVER_BATCH_OPS_FOR_UNARY:
+ /* Finished unary call. */
+ grpc_call_destroy(call);
+ request_call();
+ break;
}
break;
+ case GRPC_SERVER_RPC_NEW:
case GRPC_WRITE_ACCEPTED:
- GPR_ASSERT(ev->data.write_accepted == GRPC_OP_OK);
- GPR_ASSERT(grpc_call_start_read_old(ev->call, s) == GRPC_CALL_OK);
- break;
case GRPC_READ:
- if (ev->data.read) {
- GPR_ASSERT(grpc_call_start_write_old(ev->call, ev->data.read, s,
- s->flags) == GRPC_CALL_OK);
- } else {
- GPR_ASSERT(grpc_call_start_write_status_old(ev->call, GRPC_STATUS_OK,
- NULL, s) == GRPC_CALL_OK);
- }
- break;
case GRPC_FINISH_ACCEPTED:
case GRPC_FINISHED:
- if (gpr_unref(&s->pending_ops)) {
- grpc_call_destroy(ev->call);
- gpr_free(s);
- }
+ gpr_log(GPR_ERROR, "Unexpected event type.");
+ abort();
break;
case GRPC_QUEUE_SHUTDOWN:
GPR_ASSERT(shutdown_started);
diff --git a/tools/gce_setup/grpc_docker.sh b/tools/gce_setup/grpc_docker.sh
index 2e02653864..1c38582cb8 100755
--- a/tools/gce_setup/grpc_docker.sh
+++ b/tools/gce_setup/grpc_docker.sh
@@ -24,6 +24,10 @@
# Allows gcloud ssh commands to run on freshly started docker instances.
_grpc_ensure_gcloud_ssh() {
local default_key_file="$HOME/.ssh/google_compute_engine"
+ if [ "$HOME" == "/" ]
+ then
+ default_key_file="/root/.ssh/google_compute_engine"
+ fi
[ -f $default_key_file ] || {
ssh-keygen -f $default_key_file -N '' > /dev/null || {
echo "could not precreate $default_key_file" 1>&2