aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-25 12:54:23 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-25 12:54:23 -0700
commite039f0338333e1a2f368ec20740662fb2eac2875 (patch)
treec4267e757c1196815e0559790d56a6e7679c373d
parent3f475422ecb8cd5c648ce86f126122ba6dee1c9c (diff)
Plumbing transport_op changes through
-rw-r--r--BUILD4
-rw-r--r--Makefile2
-rw-r--r--build.json2
-rw-r--r--gRPC.podspec3
-rw-r--r--src/core/channel/connected_channel.c94
-rw-r--r--src/core/channel/http_client_filter.c23
-rw-r--r--src/core/channel/http_server_filter.c26
-rw-r--r--src/core/channel/noop_filter.c34
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/surface/channel.c24
-rw-r--r--src/core/surface/channel_create.c6
-rw-r--r--src/core/surface/client.c89
-rw-r--r--src/core/surface/client.h41
-rw-r--r--src/core/surface/lame_client.c43
-rw-r--r--src/core/surface/server.c100
-rw-r--r--src/core/transport/chttp2_transport.c4
-rw-r--r--src/core/transport/transport.c18
-rw-r--r--src/core/transport/transport.h2
-rw-r--r--src/core/transport/transport_impl.h2
-rw-r--r--test/core/end2end/fixtures/chttp2_fullstack.c1
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rw-r--r--vsprojects/grpc/grpc.vcxproj3
-rw-r--r--vsprojects/grpc/grpc.vcxproj.filters6
-rw-r--r--vsprojects/grpc_unsecure/grpc_unsecure.vcxproj3
-rw-r--r--vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters6
25 files changed, 146 insertions, 394 deletions
diff --git a/BUILD b/BUILD
index 745b62933f..29cc77374e 100644
--- a/BUILD
+++ b/BUILD
@@ -208,7 +208,6 @@ cc_library(
"src/core/surface/byte_buffer_queue.h",
"src/core/surface/call.h",
"src/core/surface/channel.h",
- "src/core/surface/client.h",
"src/core/surface/completion_queue.h",
"src/core/surface/event_string.h",
"src/core/surface/init.h",
@@ -333,7 +332,6 @@ cc_library(
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel_create.c",
- "src/core/surface/client.c",
"src/core/surface/completion_queue.c",
"src/core/surface/event_string.c",
"src/core/surface/init.c",
@@ -456,7 +454,6 @@ cc_library(
"src/core/surface/byte_buffer_queue.h",
"src/core/surface/call.h",
"src/core/surface/channel.h",
- "src/core/surface/client.h",
"src/core/surface/completion_queue.h",
"src/core/surface/event_string.h",
"src/core/surface/init.h",
@@ -559,7 +556,6 @@ cc_library(
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel_create.c",
- "src/core/surface/client.c",
"src/core/surface/completion_queue.c",
"src/core/surface/event_string.c",
"src/core/surface/init.c",
diff --git a/Makefile b/Makefile
index 283f3c6cd0..7ca182d9de 100644
--- a/Makefile
+++ b/Makefile
@@ -3083,7 +3083,6 @@ LIBGRPC_SRC = \
src/core/surface/call_log_batch.c \
src/core/surface/channel.c \
src/core/surface/channel_create.c \
- src/core/surface/client.c \
src/core/surface/completion_queue.c \
src/core/surface/event_string.c \
src/core/surface/init.c \
@@ -3341,7 +3340,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/surface/call_log_batch.c \
src/core/surface/channel.c \
src/core/surface/channel_create.c \
- src/core/surface/client.c \
src/core/surface/completion_queue.c \
src/core/surface/event_string.c \
src/core/surface/init.c \
diff --git a/build.json b/build.json
index b05f423c83..dc3d2ac1c5 100644
--- a/build.json
+++ b/build.json
@@ -169,7 +169,6 @@
"src/core/surface/byte_buffer_queue.h",
"src/core/surface/call.h",
"src/core/surface/channel.h",
- "src/core/surface/client.h",
"src/core/surface/completion_queue.h",
"src/core/surface/event_string.h",
"src/core/surface/init.h",
@@ -272,7 +271,6 @@
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel_create.c",
- "src/core/surface/client.c",
"src/core/surface/completion_queue.c",
"src/core/surface/event_string.c",
"src/core/surface/init.c",
diff --git a/gRPC.podspec b/gRPC.podspec
index f8b5a99bdc..1707ee7eaa 100644
--- a/gRPC.podspec
+++ b/gRPC.podspec
@@ -210,7 +210,6 @@ Pod::Spec.new do |s|
'src/core/surface/byte_buffer_queue.h',
'src/core/surface/call.h',
'src/core/surface/channel.h',
- 'src/core/surface/client.h',
'src/core/surface/completion_queue.h',
'src/core/surface/event_string.h',
'src/core/surface/init.h',
@@ -342,7 +341,6 @@ Pod::Spec.new do |s|
'src/core/surface/call_log_batch.c',
'src/core/surface/channel.c',
'src/core/surface/channel_create.c',
- 'src/core/surface/client.c',
'src/core/surface/completion_queue.c',
'src/core/surface/event_string.c',
'src/core/surface/init.c',
@@ -464,7 +462,6 @@ Pod::Spec.new do |s|
'src/core/surface/byte_buffer_queue.h',
'src/core/surface/call.h',
'src/core/surface/channel.h',
- 'src/core/surface/client.h',
'src/core/surface/completion_queue.h',
'src/core/surface/event_string.h',
'src/core/surface/init.h',
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 6fad077c62..1d30b073ab 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -61,36 +61,21 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void con_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void con_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_transport_perform_op(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+ grpc_transport_perform_stream_op(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
-/* Currently we assume all channel operations should just be pushed up. */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
+static void con_start_transport_op(grpc_channel_element *elem,
+ grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- grpc_transport_goaway(chand->transport, op->data.goaway.status,
- op->data.goaway.message);
- break;
- case GRPC_CHANNEL_DISCONNECT:
- grpc_transport_close(chand->transport);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_channel_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport, op);
}
/* Constructor for call_data */
@@ -136,61 +121,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- con_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "connected",
-};
-
-/* Transport callback to accept a new stream... calls up to handle it */
-static void accept_stream(void *user_data, grpc_transport *transport,
- const void *transport_server_data) {
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_ACCEPT_CALL;
- op.dir = GRPC_CALL_UP;
- op.data.accept_call.transport = transport;
- op.data.accept_call.transport_server_data = transport_server_data;
- channel_op(elem, NULL, &op);
-}
-
-static void transport_goaway(void *user_data, grpc_transport *transport,
- grpc_status_code status, gpr_slice debug) {
- /* transport got goaway ==> call up and handle it */
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_TRANSPORT_GOAWAY;
- op.dir = GRPC_CALL_UP;
- op.data.goaway.status = status;
- op.data.goaway.message = debug;
- channel_op(elem, NULL, &op);
-}
-
-static void transport_closed(void *user_data, grpc_transport *transport) {
- /* transport was closed ==> call up and handle it */
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_TRANSPORT_CLOSED;
- op.dir = GRPC_CALL_UP;
- channel_op(elem, NULL, &op);
-}
-
-const grpc_transport_callbacks connected_channel_transport_callbacks = {
- accept_stream, transport_goaway, transport_closed,
+ con_start_transport_stream_op,
+ con_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "connected",
};
grpc_transport_setup_result grpc_connected_channel_bind_transport(
@@ -213,6 +152,5 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport(
channel_stack->call_stack_size += grpc_transport_stream_size(transport);
ret.user_data = elem;
- ret.callbacks = &connected_channel_transport_callbacks;
return ret;
}
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 3f10c4fc88..6928a59c38 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -134,23 +134,6 @@ static void hc_start_transport_op(grpc_call_element *elem,
grpc_call_next_op(elem, op);
}
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
@@ -222,6 +205,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "http-client"};
+ hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "http-client"};
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 6434502bdc..dac53e9bf1 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -72,9 +72,6 @@ typedef struct channel_data {
grpc_mdctx *mdctx;
} channel_data;
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
@@ -216,23 +213,6 @@ static void hs_start_transport_op(grpc_call_element *elem,
grpc_call_next_op(elem, op);
}
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
@@ -298,6 +278,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "http-server"};
+ hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d472b80744..1478f04a3c 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -62,31 +62,14 @@ static void noop_mutate_op(grpc_call_element *elem,
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void noop_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void noop_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
noop_mutate_op(elem, op);
/* pass control down the stack */
grpc_call_next_op(elem, op);
}
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
@@ -136,7 +119,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
ignore_unused(channeld);
}
-const grpc_channel_filter grpc_no_op_filter = {
- noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "no-op"};
+const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "no-op"};
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ddff3efb32..7a8eb8c54f 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1154,7 +1154,7 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
elem = CALL_ELEM_FROM_CALL(call, 0);
op->context = call->context;
- elem->filter->start_transport_op(elem, op);
+ elem->filter->start_transport_stream_op(elem, op);
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a3c4dcebc1..6c4b407a85 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -39,7 +39,6 @@
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include "src/core/surface/call.h"
-#include "src/core/surface/client.h"
#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -238,22 +237,15 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
}
}
-void grpc_channel_destroy(grpc_channel *channel) {
- grpc_channel_op op;
- grpc_channel_element *elem;
-
- elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
-
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect");
- elem->filter->channel_op(elem, NULL, &op);
-
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- elem->filter->channel_op(elem, NULL, &op);
+static void execute_op(grpc_channel *channel, grpc_transport_op *op) {
+ abort();
+}
+void grpc_channel_destroy(grpc_channel *channel) {
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.disconnect = 1;
+ execute_op(channel, &op);
GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 8efd86b9f6..14ff63a2e3 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -38,9 +38,7 @@
#include "src/core/channel/client_channel.h"
#include "src/core/client_config/resolver_registry.h"
-#include "src/core/client_config/subchannels/tcp_subchannel.h"
#include "src/core/surface/channel.h"
-#include "src/core/surface/client.h"
/* Create a client channel:
Asynchronously: - resolve target
@@ -53,7 +51,6 @@ grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_filter *filters[MAX_FILTERS];
grpc_resolver *resolver;
int n = 0;
- filters[n++] = &grpc_client_surface_filter;
/* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
@@ -61,7 +58,8 @@ grpc_channel *grpc_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- resolver = grpc_resolver_create(target, grpc_create_tcp_subchannel_factory());
+ GPR_ASSERT(!"NULL should be a subchannel factory creation below");
+ resolver = grpc_resolver_create(target, NULL);
if (!resolver) {
return NULL;
}
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
deleted file mode 100644
index 9c9cba5771..0000000000
--- a/src/core/surface/client.c
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/surface/client.h"
-
-#include "src/core/surface/call.h"
-#include "src/core/surface/channel.h"
-#include "src/core/support/string.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-typedef struct { void *unused; } call_data;
-
-typedef struct { void *unused; } channel_data;
-
-static void client_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_call_next_op(elem, op);
-}
-
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- switch (op->type) {
- case GRPC_ACCEPT_CALL:
- gpr_log(GPR_ERROR, "Client cannot accept new calls");
- break;
- case GRPC_TRANSPORT_CLOSED:
- grpc_client_channel_closed(elem);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- gpr_slice_unref(op->data.goaway.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_channel_next_op(elem, op);
- }
-}
-
-static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data,
- grpc_transport_stream_op *initial_op) {}
-
-static void destroy_call_elem(grpc_call_element *elem) {}
-
-static void init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- GPR_ASSERT(is_first);
- GPR_ASSERT(!is_last);
-}
-
-static void destroy_channel_elem(grpc_channel_element *elem) {}
-
-const grpc_channel_filter grpc_client_surface_filter = {
- client_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "client",
-};
diff --git a/src/core/surface/client.h b/src/core/surface/client.h
deleted file mode 100644
index 9db2ccf3d2..0000000000
--- a/src/core/surface/client.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_SURFACE_CLIENT_H
-#define GRPC_INTERNAL_CORE_SURFACE_CLIENT_H
-
-#include "src/core/channel/channel_stack.h"
-
-extern const grpc_channel_filter grpc_client_surface_filter;
-
-#endif /* GRPC_INTERNAL_CORE_SURFACE_CLIENT_H */
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 4b55e9dc91..5235d3f7f4 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -49,16 +49,16 @@ typedef struct {
typedef struct { grpc_mdctx *mdctx; } channel_data;
-static void lame_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void lame_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->send_ops) {
+ if (op->send_ops != NULL) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
- if (op->recv_ops) {
+ if (op->recv_ops != NULL) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp);
@@ -77,22 +77,21 @@ static void lame_start_transport_op(grpc_call_element *elem,
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
}
- if (op->on_consumed) {
+ if (op->on_consumed != NULL) {
op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- gpr_slice_unref(op->data.goaway.message);
- break;
- case GRPC_CHANNEL_DISCONNECT:
- grpc_client_channel_closed(elem);
- break;
- default:
- break;
+static void lame_start_transport_op(grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ if (op->on_connectivity_state_change) {
+ GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE);
+ *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
+ op->on_connectivity_state_change->cb(
+ op->on_connectivity_state_change->cb_arg, 1);
+ }
+ if (op->on_consumed != NULL) {
+ op->on_consumed->cb(op->on_consumed->cb_arg, 1);
}
}
@@ -118,9 +117,15 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- lame_start_transport_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "lame-client",
+ lame_start_transport_stream_op,
+ lame_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "lame-client",
};
grpc_channel *grpc_lame_client_channel_create(void) {
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 607344a7a6..568f7925dd 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -115,6 +115,7 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
size_t num_calls;
+ grpc_connectivity_state connectivity_state;
grpc_channel *channel;
grpc_mdstr *path_key;
grpc_mdstr *authority_key;
@@ -125,6 +126,7 @@ struct channel_data {
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
grpc_iomgr_closure finish_destroy_channel_closure;
+ grpc_iomgr_closure channel_connectivity_changed;
};
typedef struct shutdown_tag {
@@ -539,13 +541,41 @@ static void server_mutate_op(grpc_call_element *elem,
}
}
-static void server_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void server_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
+static void accept_stream(void *cd, grpc_transport *transport,
+ const void *transport_server_data) {
+ channel_data *chand = cd;
+ /* create a call */
+ grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
+ gpr_inf_future);
+}
+
+static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
+ channel_data *chand = cd;
+ grpc_server *server = chand->server;
+ if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ &op);
+ } else {
+ gpr_mu_lock(&server->mu_global);
+ destroy_channel(chand);
+ gpr_mu_unlock(&server->mu_global);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
+ }
+}
+
+#if 0
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data;
@@ -576,39 +606,45 @@ static void channel_op(grpc_channel_element *elem,
break;
}
}
+#endif
typedef struct {
channel_data *chand;
int send_goaway;
int send_disconnect;
grpc_iomgr_closure finish_shutdown_channel_closure;
+
+ /* for use during shutdown: the goaway message to send */
+ gpr_slice goaway_message;
} shutdown_channel_args;
-static void finish_shutdown_channel(void *p, int success) {
+static void destroy_shutdown_channel_args(void *p, int success) {
shutdown_channel_args *sca = p;
- grpc_channel_op op;
-
- if (sca->send_goaway) {
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
- }
- if (sca->send_disconnect) {
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
- }
GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown");
-
+ gpr_slice_unref(sca->goaway_message);
gpr_free(sca);
}
+static void finish_shutdown_channel(void *p, int success) {
+ shutdown_channel_args *sca = p;
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+
+ op.send_goaway = sca->send_goaway;
+ sca->goaway_message = gpr_slice_from_copied_string("Server shutdown");
+ op.goaway_message = &sca->goaway_message;
+ op.goaway_status = GRPC_STATUS_OK;
+ op.disconnect = sca->send_disconnect;
+ grpc_iomgr_closure_init(&sca->finish_shutdown_channel_closure,
+ destroy_shutdown_channel_args, sca);
+ op.on_consumed = &sca->finish_shutdown_channel_closure;
+
+ grpc_channel_next_op(
+ grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(sca->chand->channel), 0),
+ &op);
+}
+
static void shutdown_channel(channel_data *chand, int send_goaway,
int send_disconnect) {
shutdown_channel_args *sca;
@@ -687,6 +723,9 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
+ chand->connectivity_state = GRPC_CHANNEL_IDLE;
+ grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@@ -717,8 +756,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- server_start_transport_op,
- channel_op,
+ server_start_transport_stream_op,
+ grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
destroy_call_elem,
@@ -852,6 +891,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_uint32 slots;
gpr_uint32 probes;
gpr_uint32 max_probes = 0;
+ grpc_transport_op op;
grpc_transport_setup_result result;
for (i = 0; i < s->channel_filter_count; i++) {
@@ -863,7 +903,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
filters[i] = &grpc_connected_channel_filter;
for (i = 0; i < s->cq_count; i++) {
- grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
+ memset(&op, 0, sizeof(op));
+ op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
+ grpc_transport_perform_op(transport, &op);
}
channel =
@@ -875,6 +917,14 @@ grpc_transport_setup_result grpc_server_setup_transport(
server_ref(s);
chand->channel = channel;
+ GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = accept_stream;
+ op.set_accept_stream_user_data = chand;
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_transport_perform_op(transport, &op);
+
num_registered_methods = 0;
for (rm = s->registered_methods; rm; rm = rm->next) {
num_registered_methods++;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 685098bcba..f2568c01e0 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1038,11 +1038,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
init_stream,
perform_op,
- add_to_pollset,
destroy_stream,
- goaway,
- close_transport,
- send_ping,
destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 40faa27211..a570cba33e 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -38,15 +38,6 @@ size_t grpc_transport_stream_size(grpc_transport *transport) {
return transport->vtable->sizeof_stream;
}
-void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
- gpr_slice message) {
- transport->vtable->goaway(transport, status, message);
-}
-
-void grpc_transport_close(grpc_transport *transport) {
- transport->vtable->close(transport);
-}
-
void grpc_transport_destroy(grpc_transport *transport) {
transport->vtable->destroy(transport);
}
@@ -68,20 +59,11 @@ void grpc_transport_perform_op(grpc_transport *transport,
transport->vtable->perform_op(transport, op);
}
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset) {
- transport->vtable->add_to_pollset(transport, pollset);
-}
-
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream) {
transport->vtable->destroy_stream(transport, stream);
}
-void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb) {
- transport->vtable->ping(transport, cb);
-}
-
void grpc_transport_setup_cancel(grpc_transport_setup *setup) {
setup->vtable->cancel(setup);
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 7f6a37d048..1acd665a1d 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -85,6 +85,8 @@ typedef struct grpc_transport_stream_op {
/** Transport op: a set of operations to perform on a transport as a whole */
typedef struct grpc_transport_op {
+ /** called when processing of this op is done */
+ grpc_iomgr_closure *on_consumed;
/** connectivity monitoring */
grpc_iomgr_closure *on_connectivity_state_change;
grpc_connectivity_state *connectivity_state;
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index b65b1d5607..8283939050 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -52,7 +52,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_perform_op */
void (*perform_op)(grpc_transport *self, grpc_stream *stream,
- grpc_transport_stream_op *op);
+ grpc_transport_op *op);
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c
index b83e227a89..e647434509 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack.c
@@ -39,7 +39,6 @@
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/surface/channel.h"
-#include "src/core/surface/client.h"
#include "src/core/surface/server.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 4f00104c02..bc2cfaf6a4 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -845,7 +845,6 @@ src/core/profiling/timers_preciseclock.h \
src/core/surface/byte_buffer_queue.h \
src/core/surface/call.h \
src/core/surface/channel.h \
-src/core/surface/client.h \
src/core/surface/completion_queue.h \
src/core/surface/event_string.h \
src/core/surface/init.h \
@@ -970,7 +969,6 @@ src/core/surface/call_details.c \
src/core/surface/call_log_batch.c \
src/core/surface/channel.c \
src/core/surface/channel_create.c \
-src/core/surface/client.c \
src/core/surface/completion_queue.c \
src/core/surface/event_string.c \
src/core/surface/init.c \
diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj
index 0a7da41dbf..6c55c4f834 100644
--- a/vsprojects/grpc/grpc.vcxproj
+++ b/vsprojects/grpc/grpc.vcxproj
@@ -234,7 +234,6 @@
<ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h" />
<ClInclude Include="..\..\src\core\surface\call.h" />
<ClInclude Include="..\..\src\core\surface\channel.h" />
- <ClInclude Include="..\..\src\core\surface\client.h" />
<ClInclude Include="..\..\src\core\surface\completion_queue.h" />
<ClInclude Include="..\..\src\core\surface\event_string.h" />
<ClInclude Include="..\..\src\core\surface\init.h" />
@@ -454,8 +453,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_create.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\surface\client.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\surface\completion_queue.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\event_string.c">
diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters
index 8d1815c751..d2ada43b6d 100644
--- a/vsprojects/grpc/grpc.vcxproj.filters
+++ b/vsprojects/grpc/grpc.vcxproj.filters
@@ -280,9 +280,6 @@
<ClCompile Include="..\..\src\core\surface\channel_create.c">
<Filter>src\core\surface</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\core\surface\client.c">
- <Filter>src\core\surface</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\core\surface\completion_queue.c">
<Filter>src\core\surface</Filter>
</ClCompile>
@@ -650,9 +647,6 @@
<ClInclude Include="..\..\src\core\surface\channel.h">
<Filter>src\core\surface</Filter>
</ClInclude>
- <ClInclude Include="..\..\src\core\surface\client.h">
- <Filter>src\core\surface</Filter>
- </ClInclude>
<ClInclude Include="..\..\src\core\surface\completion_queue.h">
<Filter>src\core\surface</Filter>
</ClInclude>
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
index b96e3092e2..09b3997277 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
@@ -216,7 +216,6 @@
<ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h" />
<ClInclude Include="..\..\src\core\surface\call.h" />
<ClInclude Include="..\..\src\core\surface\channel.h" />
- <ClInclude Include="..\..\src\core\surface\client.h" />
<ClInclude Include="..\..\src\core\surface\completion_queue.h" />
<ClInclude Include="..\..\src\core\surface\event_string.h" />
<ClInclude Include="..\..\src\core\surface\init.h" />
@@ -392,8 +391,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_create.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\surface\client.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\surface\completion_queue.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\event_string.c">
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
index dd2846eef1..3b65c46f2b 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -214,9 +214,6 @@
<ClCompile Include="..\..\src\core\surface\channel_create.c">
<Filter>src\core\surface</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\core\surface\client.c">
- <Filter>src\core\surface</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\core\surface\completion_queue.c">
<Filter>src\core\surface</Filter>
</ClCompile>
@@ -533,9 +530,6 @@
<ClInclude Include="..\..\src\core\surface\channel.h">
<Filter>src\core\surface</Filter>
</ClInclude>
- <ClInclude Include="..\..\src\core\surface\client.h">
- <Filter>src\core\surface</Filter>
- </ClInclude>
<ClInclude Include="..\..\src\core\surface\completion_queue.h">
<Filter>src\core\surface</Filter>
</ClInclude>