aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-17 10:12:42 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-17 10:12:42 -0700
commitc851625fa4fcab421d19a29e965db0159f185121 (patch)
treea8c85b1422e72fc6fd7c2fea842b84d865554e2f /src/core
parent66abdaad3274b33e99042b5e493e749991201f81 (diff)
parent7885dc57d012c356553c503e8354eb68bb9168fb (diff)
Merge branch 'we-dont-need-no-backup' into oops-i-split-it-again
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c5
-rw-r--r--src/core/iomgr/pollset_posix.c5
-rw-r--r--src/core/support/log_win32.c3
-rw-r--r--src/core/surface/call.c41
-rw-r--r--src/core/surface/call.h1
-rw-r--r--src/core/surface/completion_queue.c5
-rw-r--r--src/core/surface/completion_queue.h3
-rw-r--r--src/core/surface/server.c10
-rw-r--r--src/core/transport/stream_op.h9
10 files changed, 78 insertions, 9 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 8e585a007d..e5e3435feb 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -100,8 +100,9 @@ static int multipoll_with_epoll_pollset_maybe_work(
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout_ms = -1;
} else {
- timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout_ms <= 0) {
+ timeout_ms = gpr_time_to_millis(
+ gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
+ if (timeout_ms < 0) {
return 1;
}
}
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 5ee6980732..d21c52c0f0 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -116,8 +116,9 @@ static int multipoll_with_poll_pollset_maybe_work(
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout = -1;
} else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
+ timeout = gpr_time_to_millis(
+ gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
+ if (timeout < 0) {
return 1;
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index db704b9df1..0fe54c8f1d 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -346,8 +346,9 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout = -1;
} else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
+ timeout = gpr_time_to_millis(
+ gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
+ if (timeout < 0) {
return 1;
}
}
diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c
index 159c7e052c..f878e6aaa1 100644
--- a/src/core/support/log_win32.c
+++ b/src/core/support/log_win32.c
@@ -42,6 +42,7 @@
#include <grpc/support/log_win32.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
#include "src/core/support/string_win32.h"
@@ -106,7 +107,7 @@ char *gpr_format_message(DWORD messageid) {
NULL, messageid,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
- if (status == 0) return gpr_strdup("Unable to retreive error string");
+ if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);
LocalFree(tmessage);
return message;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 872705e996..b309cd57b0 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -193,6 +193,7 @@ struct grpc_call {
and a strong upper bound of a count of masters to be calculated. */
gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
+ gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
/* Dynamic array of ioreq's that have completed: the count of
@@ -236,6 +237,7 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
@@ -717,6 +719,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
} else if (msg.length > 0) {
call->reading_message = 1;
call->incoming_message_length = msg.length;
+ call->incoming_message_flags = msg.flags;
return 1;
} else {
finish_message(call);
@@ -866,6 +869,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
+ gpr_uint32 flags;
grpc_metadata_batch mdb;
size_t i;
GPR_ASSERT(op->send_ops == NULL);
@@ -891,8 +895,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
grpc_sopb_add_begin_message(
- &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), flags);
copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
@@ -1034,6 +1039,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
have_ops |= 1u << op;
call->request_data[op] = data;
+ call->request_flags[op] = reqs[i].flags;
call->request_set[op] = set;
}
@@ -1269,6 +1275,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
+static int are_write_flags_valid(gpr_uint32 flags) {
+ /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
+ const gpr_uint32 allowed_write_positions =
+ (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
+ const gpr_uint32 invalid_positions = ~allowed_write_positions;
+ return !(flags & invalid_positions);
+}
+
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
@@ -1291,30 +1305,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op = &ops[in];
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
op->data.send_initial_metadata.metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
+ if (!are_write_flags_valid(op->flags)){
+ return GRPC_CALL_ERROR_INVALID_FLAGS;
+ }
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
+ req->flags = ops->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+ req->flags = op->flags;
req->data.send_metadata.count =
op->data.send_status_from_server.trailing_metadata_count;
req->data.send_metadata.metadata =
@@ -1332,24 +1359,33 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
@@ -1367,8 +1403,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 7ea6cd7fa9..a613bac8cd 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -79,6 +79,7 @@ typedef union {
typedef struct {
grpc_ioreq_op op;
grpc_ioreq_data data;
+ gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 57ecf365cc..bd0fabf9da 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -73,6 +73,7 @@ struct grpc_completion_queue {
event *queue;
/* Fixed size chained hash table of events for pluck() */
event *buckets[NUM_TAG_BUCKETS];
+ int is_server_cq;
};
grpc_completion_queue *grpc_completion_queue_create(void) {
@@ -323,3 +324,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
+
+void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
+
+int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 2249d0e789..e76910c00b 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -63,4 +63,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
+void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
+int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index a76a6c7812..43e9afd56b 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -709,6 +709,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
if (server->cqs[i] == cq) return;
}
GRPC_CQ_INTERNAL_REF(cq, "server");
+ grpc_cq_mark_server_cq(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));
@@ -1081,6 +1082,9 @@ grpc_call_error grpc_server_request_call(
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
@@ -1099,6 +1103,9 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = REGISTERED_CALL;
rc.tag = tag;
@@ -1153,6 +1160,7 @@ static void begin_call(grpc_server *server, call_data *calld,
rc->data.batch.details->deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
+ r->flags = 0;
r++;
publish = publish_registered_or_batch;
break;
@@ -1160,10 +1168,12 @@ static void begin_call(grpc_server *server, call_data *calld,
*rc->data.registered.deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
+ r->flags = 0;
r++;
if (rc->data.registered.optional_payload) {
r->op = GRPC_IOREQ_RECV_MESSAGE;
r->data.recv_message = rc->data.registered.optional_payload;
+ r->flags = 0;
r++;
}
publish = publish_registered_or_batch;
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 644be567f3..aae101edad 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -58,11 +58,18 @@ typedef enum grpc_stream_op_code {
GRPC_OP_SLICE
} grpc_stream_op_code;
+/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
+ * compression for the message */
+#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
+/** Mask of all valid internal flags. */
+#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
+
/* Arguments for GRPC_OP_BEGIN_MESSAGE */
typedef struct grpc_begin_message {
/* How many bytes of data will this message contain */
gpr_uint32 length;
- /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
+ /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits,
+ * GRPC_WRITE_INTERNAL_* for the internal ones. */
gpr_uint32 flags;
} grpc_begin_message;