aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2014-12-17 16:36:59 -0800
committerGravatar Michael Lumish <mlumish@google.com>2014-12-19 13:04:38 -0800
commitd79b4865ce124d12b2394c2755df8f6539418e93 (patch)
treec3fc33e4cf7d043e7cb61fe4a526e491554aaed1 /src
parenta4b6f5df94d6293455f2fed86b56491b7f6dd39e (diff)
Introduce the (outside-of-iomgr) pollset API.
This CL introduces the public side of this interface. There will need to be an iomgr-private API also, but this will be a per-implementation API and so is not covered here. I've taken care of wiring the interface through the codebase in the manner that I expect it will be used. Change on 2014/12/17 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82376987
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/call_op_string.c2
-rw-r--r--src/core/channel/channel_stack.h3
-rw-r--r--src/core/channel/client_channel.c3
-rw-r--r--src/core/channel/connected_channel.c1
-rw-r--r--src/core/iomgr/endpoint.c4
-rw-r--r--src/core/iomgr/endpoint.h6
-rw-r--r--src/core/iomgr/pollset.c37
-rw-r--r--src/core/iomgr/pollset.h51
-rw-r--r--src/core/iomgr/tcp_posix.c11
-rw-r--r--src/core/security/secure_endpoint.c9
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/surface/completion_queue.c8
-rw-r--r--src/core/surface/completion_queue.h3
-rw-r--r--src/core/surface/server.c2
-rw-r--r--src/core/transport/chttp2_transport.c13
-rw-r--r--src/core/transport/transport.c5
-rw-r--r--src/core/transport/transport.h12
-rw-r--r--src/core/transport/transport_impl.h3
18 files changed, 164 insertions, 11 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
index 4a98cbfbbb..9a7838ce2f 100644
--- a/src/core/channel/call_op_string.c
+++ b/src/core/channel/call_op_string.c
@@ -107,7 +107,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
op->data.deadline.tv_nsec);
break;
case GRPC_SEND_START:
- bprintf(&b, "SEND_START");
+ bprintf(&b, "SEND_START pollset=%p", op->data.start.pollset);
break;
case GRPC_SEND_MESSAGE:
bprintf(&b, "SEND_MESSAGE");
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index b837caf97b..14e972f539 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -108,6 +108,9 @@ typedef struct {
/* Argument data, matching up with grpc_call_op_type names */
union {
+ struct {
+ grpc_pollset *pollset;
+ } start;
grpc_byte_buffer *message;
grpc_mdelem *metadata;
gpr_timespec deadline;
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a146a7b6a0..6329932330 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -98,6 +98,7 @@ struct call_data {
void (*on_complete)(void *user_data, grpc_op_error error);
void *on_complete_user_data;
gpr_uint32 start_flags;
+ grpc_pollset *pollset;
} waiting;
} s;
};
@@ -186,6 +187,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
calld->s.waiting.on_complete = op->done_cb;
calld->s.waiting.on_complete_user_data = op->user_data;
calld->s.waiting.start_flags = op->flags;
+ calld->s.waiting.pollset = op->data.start.pollset;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
@@ -523,6 +525,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
call_ops[i].user_data =
waiting_children[i]->s.waiting.on_complete_user_data;
+ call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 8581fb41d6..5faa03c2f4 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -132,6 +132,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
op->user_data);
break;
case GRPC_SEND_START:
+ grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
end_bufferable_op(op, chand, calld, 0);
break;
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 259c948720..f1944bf672 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -44,6 +44,10 @@ grpc_endpoint_write_status grpc_endpoint_write(
return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline);
}
+void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+ ep->vtable->add_to_pollset(ep, pollset);
+}
+
void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index 88949fb03f..bbd800bea8 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -34,6 +34,7 @@
#ifndef __GRPC_INTERNAL_IOMGR_ENDPOINT_H__
#define __GRPC_INTERNAL_IOMGR_ENDPOINT_H__
+#include "src/core/iomgr/pollset.h"
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
@@ -69,6 +70,7 @@ struct grpc_endpoint_vtable {
grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_endpoint_write_cb cb,
void *user_data, gpr_timespec deadline);
+ void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
};
@@ -92,6 +94,10 @@ grpc_endpoint_write_status grpc_endpoint_write(
void grpc_endpoint_shutdown(grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_endpoint *ep);
+/* Add an endpoint to a pollset, so that when the pollset is polled, events from
+ this endpoint are considered */
+void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
+
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
};
diff --git a/src/core/iomgr/pollset.c b/src/core/iomgr/pollset.c
new file mode 100644
index 0000000000..62a0019eb3
--- /dev/null
+++ b/src/core/iomgr/pollset.c
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/pollset.h"
+
+void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; }
+void grpc_pollset_destroy(grpc_pollset *pollset) {}
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
new file mode 100644
index 0000000000..ba1a9d5429
--- /dev/null
+++ b/src/core/iomgr/pollset.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_
+#define __GRPC_INTERNAL_IOMGR_POLLSET_H_
+
+/* A grpc_pollset is a set of file descriptors that a higher level item is
+ interested in. For example:
+ - a server will typically keep a pollset containing all connected channels,
+ so that it can find new calls to service
+ - a completion queue might keep a pollset with an entry for each transport
+ that is servicing a call that it's tracking */
+/* Eventually different implementations of iomgr will provide their own
+ grpc_pollset structs. As this is just a dummy wrapper to get the API in,
+ we just define a simple type here. */
+typedef struct { char unused; } grpc_pollset;
+
+void grpc_pollset_init(grpc_pollset *pollset);
+void grpc_pollset_destroy(grpc_pollset *pollset);
+
+#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index b59e916e13..bc3ce69e47 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -538,9 +538,14 @@ static grpc_endpoint_write_status grpc_tcp_write(
return status;
}
-static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read,
- grpc_tcp_write, grpc_tcp_shutdown,
- grpc_tcp_destroy};
+static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+ /* tickle the pollset so we crash if things aren't wired correctly */
+ pollset->unused++;
+}
+
+static const grpc_endpoint_vtable vtable = {
+ grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
+ grpc_tcp_shutdown, grpc_tcp_destroy};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 34be4256a9..cab09ca49d 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -325,8 +325,15 @@ static void endpoint_unref(grpc_endpoint *secure_ep) {
secure_endpoint_unref(ep);
}
+static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
+ grpc_pollset *pollset) {
+ secure_endpoint *ep = (secure_endpoint *)secure_ep;
+ grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
+}
+
static const grpc_endpoint_vtable vtable = {
- endpoint_notify_on_read, endpoint_write, endpoint_shutdown, endpoint_unref};
+ endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset,
+ endpoint_shutdown, endpoint_unref};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ba4c806602..6270ce640d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -409,6 +409,7 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = done_invoke;
+ op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
elem = CALL_ELEM_FROM_CALL(call, 0);
@@ -480,6 +481,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = do_nothing;
+ op.data.start.pollset = grpc_cq_pollset(call->cq);
op.user_data = NULL;
elem = CALL_ELEM_FROM_CALL(call, 0);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b890f9dab7..4837f5b978 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -66,6 +66,8 @@ struct grpc_completion_queue {
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
once all queued events are drained */
gpr_refcount refs;
+ /* the set of low level i/o things that concern this cq */
+ grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
int shutdown;
/* Head of a linked list of queued events (prev points to the last element) */
@@ -87,6 +89,7 @@ grpc_completion_queue *grpc_completion_queue_create() {
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
+ grpc_pollset_init(&cc->pollset);
cc->allow_polling = 1;
return cc;
}
@@ -367,6 +370,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
GPR_ASSERT(cc->queue == NULL);
+ grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
@@ -392,3 +396,7 @@ void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
gpr_log(GPR_INFO, "pending ops:%s", tmp);
#endif
}
+
+grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
+ return &cc->pollset;
+}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 8f3d34ab31..2e752a3fe0 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -36,6 +36,7 @@
/* Internal API for completion channels */
+#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
/* A finish func is executed whenever the event consumer calls
@@ -101,4 +102,6 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
void grpc_cq_dump_pending_ops(grpc_completion_queue *cc);
+grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
+
#endif /* __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 9eaa3c0602..3829e7aa8f 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -491,6 +491,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
}
filters[i] = &grpc_connected_channel_filter;
+ grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
+
channel = grpc_channel_create_from_filters(filters, num_filters,
s->channel_args, mdctx, 0);
chand = (channel_data *)grpc_channel_stack_element(
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 0570439813..a8ae8cc5bc 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1696,14 +1696,23 @@ static void run_callbacks(transport *t) {
}
}
+static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
+ transport *t = (transport *)gt;
+ lock(t);
+ if (t->ep) {
+ grpc_endpoint_add_to_pollset(t->ep, pollset);
+ }
+ unlock(t);
+}
+
/*
* INTEGRATION GLUE
*/
static const grpc_transport_vtable vtable = {
sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- destroy_stream, abort_stream, goaway, close_transport, send_ping,
- destroy_transport};
+ add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
+ send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 1c44abcadf..0ca67acb92 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -66,6 +66,11 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport,
transport->vtable->set_allow_window_updates(transport, stream, allow);
}
+void grpc_transport_add_to_pollset(grpc_transport *transport,
+ grpc_pollset *pollset) {
+ transport->vtable->add_to_pollset(transport, pollset);
+}
+
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream) {
transport->vtable->destroy_stream(transport, stream);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 6a089a2a15..00dacbf5b9 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -36,6 +36,7 @@
#include <stddef.h>
+#include "src/core/iomgr/pollset.h"
#include "src/core/transport/stream_op.h"
/* forward declarations */
@@ -202,15 +203,18 @@ void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
grpc_status_code status);
+void grpc_transport_add_to_pollset(grpc_transport *transport,
+ grpc_pollset *pollset);
+
/* Advise peer of pending connection termination. */
-void grpc_transport_goaway(struct grpc_transport *transport,
- grpc_status_code status, gpr_slice debug_data);
+void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
+ gpr_slice debug_data);
/* Close a transport. Aborts all open streams. */
-void grpc_transport_close(struct grpc_transport *transport);
+void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
-void grpc_transport_destroy(struct grpc_transport *transport);
+void grpc_transport_destroy(grpc_transport *transport);
/* Return type for grpc_transport_setup_callback */
typedef struct grpc_transport_setup_result {
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index 328ead2872..9f497b9cba 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -53,6 +53,9 @@ typedef struct grpc_transport_vtable {
void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
int allow);
+ /* implementation of grpc_transport_add_to_pollset */
+ void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
+
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);