aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-29 13:46:17 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-29 13:46:17 -0700
commit455253f52cb184208b0fb6bba87f5774230fb7f3 (patch)
treeeb873d6ec32c266f54208dc96353958c8b495a8e /src
parent27aa7afb728ad83a742f429dc47266900d6fd449 (diff)
parentdba5e0f96be51cce1343a31b5bd3734342642a29 (diff)
Merge branch 'one-read' into swappy
Diffstat (limited to 'src')
-rw-r--r--src/core/iomgr/tcp_posix.c2
-rw-r--r--src/core/profiling/timers.c5
-rw-r--r--src/core/security/google_default_credentials.c2
-rw-r--r--src/core/support/alloc.c2
-rw-r--r--src/core/support/time_win32.c2
-rw-r--r--src/core/surface/call.c3
-rw-r--r--src/core/transport/chttp2_transport.c20
-rw-r--r--src/core/transport/transport_op_string.c3
-rw-r--r--src/objective-c/examples/Sample/Podfile.lock44
-rw-r--r--src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m89
-rw-r--r--src/python/src/grpc/_adapter/_c_test.py5
-rw-r--r--src/python/src/grpc/_adapter/_call.c237
-rw-r--r--src/python/src/grpc/_adapter/_call.h28
-rw-r--r--src/python/src/grpc/_adapter/_completion_queue.c171
-rw-r--r--src/python/src/grpc/_adapter/_low_test.py8
-rw-r--r--src/python/src/grpc/_adapter/_server.c20
-rw-r--r--src/python/src/grpc/_adapter/_server.h4
-rw-r--r--src/python/src/grpc/_adapter/_tag.c65
-rw-r--r--src/python/src/grpc/_adapter/_tag.h70
-rw-r--r--src/python/src/grpc/_adapter/rear.py2
-rw-r--r--src/python/src/setup.py1
21 files changed, 582 insertions, 201 deletions
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 3825713aba..cef5e09760 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -375,7 +375,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) {
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else {
- /* Spurious read event, consume it here */
+ /* We've consumed the edge, request a new one */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
diff --git a/src/core/profiling/timers.c b/src/core/profiling/timers.c
index 7cc79bd22b..bd1700ffd8 100644
--- a/src/core/profiling/timers.c
+++ b/src/core/profiling/timers.c
@@ -40,10 +40,12 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
#include <stdio.h>
typedef struct grpc_timer_entry {
grpc_precise_clock tm;
+ gpr_thd_id thd;
const char* tag;
void* id;
const char* file;
@@ -85,7 +87,7 @@ static void log_report_locked(grpc_timers_log* log) {
grpc_timer_entry* entry = &(log->log[i]);
fprintf(fp, "GRPC_LAT_PROF ");
grpc_precise_clock_print(&entry->tm, fp);
- fprintf(fp, " %s %p %s %d\n", entry->tag, entry->id, entry->file,
+ fprintf(fp, " %p %s %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag, entry->id, entry->file,
entry->line);
}
@@ -121,6 +123,7 @@ void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id,
entry->id = id;
entry->file = file;
entry->line = line;
+ entry->thd = gpr_thd_currentid();
gpr_mu_unlock(&log->mu);
}
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index d2f46ddd07..0e4b9fc9d3 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -163,7 +163,7 @@ grpc_credentials *grpc_google_default_credentials_create(void) {
gpr_mu_lock(&g_mu);
if (default_credentials != NULL) {
- result = default_credentials;
+ result = grpc_credentials_ref(default_credentials);
serving_cached_credentials = 1;
goto end;
}
diff --git a/src/core/support/alloc.c b/src/core/support/alloc.c
index a19a0141d4..d2ed82e771 100644
--- a/src/core/support/alloc.c
+++ b/src/core/support/alloc.c
@@ -55,7 +55,7 @@ void *gpr_realloc(void *p, size_t size) {
}
void *gpr_malloc_aligned(size_t size, size_t alignment_log) {
- size_t alignment = 1 << alignment_log;
+ size_t alignment = ((size_t)1) << alignment_log;
size_t extra = alignment - 1 + sizeof(void *);
void *p = gpr_malloc(size + extra);
void **ret = (void **)(((gpr_uintptr)p + extra) & ~(alignment - 1));
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index 539470bccf..f4443b5c2d 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -64,7 +64,7 @@ void gpr_sleep_until(gpr_timespec until) {
}
delta = gpr_time_sub(until, now);
- sleep_millis = delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ sleep_millis = (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
Sleep(sleep_millis);
}
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index d21bfaa592..e7767cc22d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -34,6 +34,7 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
+#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
#include "src/core/surface/channel.h"
@@ -694,6 +695,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
size_t i;
+ GRPC_TIMER_MARK(CALL_ON_DONE_RECV_BEGIN, 0);
lock(call);
call->receiving = 0;
if (success) {
@@ -734,6 +736,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
+ GRPC_TIMER_MARK(CALL_ON_DONE_RECV_END, 0);
}
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 65161bd1af..dd7c60710b 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -37,6 +37,7 @@
#include <stdio.h>
#include <string.h>
+#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h"
@@ -783,6 +784,8 @@ static void unlock(transport *t) {
grpc_stream_op_buffer nuke_now;
const grpc_transport_callbacks *cb = t->cb;
+ GRPC_TIMER_MARK(HTTP2_UNLOCK_BEGIN, 0);
+
grpc_sopb_init(&nuke_now);
if (t->nuke_later_sopb.nops) {
grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
@@ -831,6 +834,8 @@ static void unlock(transport *t) {
/* finally unlock */
gpr_mu_unlock(&t->mu);
+ GRPC_TIMER_MARK(HTTP2_UNLOCK_CLEANUP, 0);
+
/* perform some callbacks if necessary */
for (i = 0; i < num_goaways; i++) {
cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
@@ -861,6 +866,8 @@ static void unlock(transport *t) {
grpc_sopb_destroy(&nuke_now);
gpr_free(goaways);
+
+ GRPC_TIMER_MARK(HTTP2_UNLOCK_END, 0);
}
/*
@@ -907,8 +914,8 @@ static int prepare_write(transport *t) {
window_delta = grpc_chttp2_preencode(
s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
- FLOWCTL_TRACE(t, t, outgoing, 0, -window_delta);
- FLOWCTL_TRACE(t, s, outgoing, s->id, -window_delta);
+ FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
+ FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
@@ -1270,8 +1277,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
return GRPC_CHTTP2_CONNECTION_ERROR;
}
- FLOWCTL_TRACE(t, t, incoming, 0, -t->incoming_frame_size);
- FLOWCTL_TRACE(t, s, incoming, s->id, -t->incoming_frame_size);
+ FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size);
+ FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
t->incoming_window -= t->incoming_frame_size;
s->incoming_window -= t->incoming_frame_size;
@@ -1411,7 +1418,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
gpr_log(GPR_ERROR,
"ignoring out of order new stream request on server; last stream "
"id=%d, new stream id=%d",
- t->last_incoming_stream_id, t->incoming_stream);
+ t->last_incoming_stream_id, t->incoming_stream_id);
+ return init_skip_frame(t, 1);
+ } else if ((t->incoming_stream_id & 1) == 0) {
+ gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id);
return init_skip_frame(t, 1);
}
t->incoming_stream = NULL;
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 04c1d79022..7bbe8276c3 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -41,6 +41,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
+/* These routines are here to facilitate debugging - they produce string
+ representations of various transport data structures */
+
static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
gpr_strvec_add(b, gpr_strdup("key="));
gpr_strvec_add(
diff --git a/src/objective-c/examples/Sample/Podfile.lock b/src/objective-c/examples/Sample/Podfile.lock
deleted file mode 100644
index ccf5aa0f06..0000000000
--- a/src/objective-c/examples/Sample/Podfile.lock
+++ /dev/null
@@ -1,44 +0,0 @@
-PODS:
- - gRPC (0.0.1):
- - gRPC/C-Core (= 0.0.1)
- - gRPC/GRPCClient (= 0.0.1)
- - gRPC/ProtoRPC (= 0.0.1)
- - gRPC/RxLibrary (= 0.0.1)
- - gRPC/C-Core (0.0.1):
- - OpenSSL (~> 1.0.200)
- - gRPC/GRPCClient (0.0.1):
- - gRPC/C-Core
- - gRPC/RxLibrary
- - gRPC/ProtoRPC (0.0.1):
- - gRPC/GRPCClient
- - gRPC/RxLibrary
- - gRPC/RxLibrary (0.0.1)
- - OpenSSL (1.0.201)
- - ProtocolBuffers (1.9.8)
- - RemoteTest (0.0.1):
- - gRPC (~> 0.0)
- - ProtocolBuffers (~> 1.9)
- - Route_guide (0.0.1):
- - ProtocolBuffers (~> 1.9)
-
-DEPENDENCIES:
- - gRPC (from `../../../..`)
- - RemoteTest (from `RemoteTestClient`)
- - Route_guide (from `RouteGuideClient`)
-
-EXTERNAL SOURCES:
- gRPC:
- :path: ../../../..
- RemoteTest:
- :path: RemoteTestClient
- Route_guide:
- :path: RouteGuideClient
-
-SPEC CHECKSUMS:
- gRPC: f6c1bf5dde59ab543e4bd1d5e2ea56da4a9a0253
- OpenSSL: 4e990d04b14015c49c800c400b86ae44a4818a5c
- ProtocolBuffers: 9a4a171c0c7cc8f21dd29aeca4f9ac775d84a880
- RemoteTest: 021a51c04d5795f286b379ca5ef14d0be5b2fb9b
- Route_guide: a277da8eef182774abb050d7b81109f5878f8652
-
-COCOAPODS: 0.36.0
diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
index 6c5de81277..2ef6a6e4ab 100644
--- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
+++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
@@ -34,6 +34,7 @@
#import <UIKit/UIKit.h>
#import <XCTest/XCTest.h>
+#import <gRPC/GRXWriter+Immediate.h>
#import <RemoteTest/Messages.pb.h>
#import <RemoteTest/Test.pb.h>
@@ -48,43 +49,85 @@
_service = [[RMTTestService alloc] initWithHost:@"grpc-test.sandbox.google.com"];
}
-- (void)testEmptyRPC {
- __weak XCTestExpectation *noRPCError = [self expectationWithDescription:@"RPC succeeded."];
- __weak XCTestExpectation *responded = [self expectationWithDescription:@"Response received."];
+// Tests as described here: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
- [_service emptyCallWithRequest:[RMTEmpty defaultInstance]
- handler:^(RMTEmpty *response, NSError *error) {
+- (void)testEmptyUnaryRPC {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"];
+
+ RMTEmpty *request = [RMTEmpty defaultInstance];
+
+ [_service emptyCallWithRequest:request handler:^(RMTEmpty *response, NSError *error) {
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
- [noRPCError fulfill];
- XCTAssertNotNil(response, @"nil response received.");
- [responded fulfill];
+
+ id expectedResponse = [RMTEmpty defaultInstance];
+ XCTAssertEqualObjects(response, expectedResponse);
+
+ [expectation fulfill];
}];
[self waitForExpectationsWithTimeout:2. handler:nil];
}
-- (void)testSimpleProtoRPC {
- __weak XCTestExpectation *noRPCError = [self expectationWithDescription:@"RPC succeeded."];
- __weak XCTestExpectation *responded = [self expectationWithDescription:@"Response received."];
- __weak XCTestExpectation *validResponse = [self expectationWithDescription:@"Valid response."];
+- (void)testLargeUnaryRPC {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"];
RMTSimpleRequest *request = [[[[[[RMTSimpleRequestBuilder alloc] init]
- setResponseSize:100]
- setFillUsername:YES]
- setFillOauthScope:YES]
+ setResponseType:RMTPayloadTypeCompressable]
+ setResponseSize:314159]
+ setPayloadBuilder:[[[RMTPayloadBuilder alloc] init]
+ setBody:[NSMutableData dataWithLength:271828]]]
build];
+
[_service unaryCallWithRequest:request handler:^(RMTSimpleResponse *response, NSError *error) {
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
- [noRPCError fulfill];
- XCTAssertNotNil(response, @"nil response received.");
- [responded fulfill];
- // We expect empty strings, not nil:
- XCTAssertNotNil(response.username, @"Response's username is nil.");
- XCTAssertNotNil(response.oauthScope, @"Response's OAuth scope is nil.");
- [validResponse fulfill];
+
+ id expectedResponse = [[[[RMTSimpleResponseBuilder alloc] init]
+ setPayloadBuilder:[[[[RMTPayloadBuilder alloc] init]
+ setType:RMTPayloadTypeCompressable]
+ setBody:[NSMutableData dataWithLength:314159]]]
+ build];
+ XCTAssertEqualObjects(response, expectedResponse);
+
+ [expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:2. handler:nil];
+ [self waitForExpectationsWithTimeout:4. handler:nil];
+}
+
+- (void)testClientStreamingRPC {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"];
+
+ id request1 = [[[[RMTStreamingInputCallRequestBuilder alloc] init]
+ setPayloadBuilder:[[[RMTPayloadBuilder alloc] init]
+ setBody:[NSMutableData dataWithLength:27182]]]
+ build];
+ id request2 = [[[[RMTStreamingInputCallRequestBuilder alloc] init]
+ setPayloadBuilder:[[[RMTPayloadBuilder alloc] init]
+ setBody:[NSMutableData dataWithLength:8]]]
+ build];
+ id request3 = [[[[RMTStreamingInputCallRequestBuilder alloc] init]
+ setPayloadBuilder:[[[RMTPayloadBuilder alloc] init]
+ setBody:[NSMutableData dataWithLength:1828]]]
+ build];
+ id request4 = [[[[RMTStreamingInputCallRequestBuilder alloc] init]
+ setPayloadBuilder:[[[RMTPayloadBuilder alloc] init]
+ setBody:[NSMutableData dataWithLength:45904]]]
+ build];
+ id<GRXWriter> writer = [GRXWriter writerWithContainer:@[request1, request2, request3, request4]];
+
+ [_service streamingInputCallWithRequestsWriter:writer
+ handler:^(RMTStreamingInputCallResponse *response, NSError *error) {
+ XCTAssertNil(error, @"Finished with unexpected error: %@", error);
+
+ id expectedResponse = [[[[RMTStreamingInputCallResponseBuilder alloc] init]
+ setAggregatedPayloadSize:74922]
+ build];
+ XCTAssertEqualObjects(response, expectedResponse);
+
+ [expectation fulfill];
+ }];
+
+ [self waitForExpectationsWithTimeout:4. handler:nil];
}
@end
diff --git a/src/python/src/grpc/_adapter/_c_test.py b/src/python/src/grpc/_adapter/_c_test.py
index 437a6730cd..6e15adbda8 100644
--- a/src/python/src/grpc/_adapter/_c_test.py
+++ b/src/python/src/grpc/_adapter/_c_test.py
@@ -83,8 +83,11 @@ class _CTest(unittest.TestCase):
_c.init()
channel = _c.Channel('%s:%d' % (host, 12345), None)
- call = _c.Call(channel, method, host, time.time() + _TIMEOUT)
+ completion_queue = _c.CompletionQueue()
+ call = _c.Call(channel, completion_queue, method, host,
+ time.time() + _TIMEOUT)
del call
+ del completion_queue
del channel
_c.shut_down()
diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c
index bf96c1a3fa..d833268fc9 100644
--- a/src/python/src/grpc/_adapter/_call.c
+++ b/src/python/src/grpc/_adapter/_call.c
@@ -36,90 +36,166 @@
#include <math.h>
#include <Python.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
#include "grpc/_adapter/_channel.h"
#include "grpc/_adapter/_completion_queue.h"
#include "grpc/_adapter/_error.h"
+#include "grpc/_adapter/_tag.h"
-static int pygrpc_call_init(Call *self, PyObject *args, PyObject *kwds) {
- const PyObject *channel;
+static PyObject *pygrpc_call_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
+ Call *self = (Call *)type->tp_alloc(type, 0);
+ Channel *channel;
+ CompletionQueue *completion_queue;
const char *method;
const char *host;
double deadline;
- static char *kwlist[] = {"channel", "method", "host", "deadline", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!ssd:Call", kwlist,
- &pygrpc_ChannelType, &channel, &method,
- &host, &deadline)) {
- return -1;
+ static char *kwlist[] = {"channel", "completion_queue",
+ "method", "host", "deadline", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(
+ args, kwds, "O!O!ssd:Call", kwlist,
+ &pygrpc_ChannelType, &channel,
+ &pygrpc_CompletionQueueType, &completion_queue,
+ &method, &host, &deadline)) {
+ return NULL;
}
/* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own
* function with its own test coverage.
*/
- self->c_call = grpc_channel_create_call_old(
- ((Channel *)channel)->c_channel, method, host,
+ self->c_call = grpc_channel_create_call(
+ channel->c_channel, completion_queue->c_completion_queue, method, host,
gpr_time_from_nanos(deadline * GPR_NS_PER_SEC));
-
- return 0;
+ self->completion_queue = completion_queue;
+ Py_INCREF(self->completion_queue);
+ self->channel = channel;
+ Py_INCREF(self->channel);
+ grpc_call_details_init(&self->call_details);
+ grpc_metadata_array_init(&self->recv_metadata);
+ grpc_metadata_array_init(&self->recv_trailing_metadata);
+ self->send_metadata = NULL;
+ self->send_metadata_count = 0;
+ self->send_trailing_metadata = NULL;
+ self->send_trailing_metadata_count = 0;
+ self->send_message = NULL;
+ self->recv_message = NULL;
+ self->adding_to_trailing = 0;
+
+ return (PyObject *)self;
}
static void pygrpc_call_dealloc(Call *self) {
if (self->c_call != NULL) {
grpc_call_destroy(self->c_call);
}
+ Py_XDECREF(self->completion_queue);
+ Py_XDECREF(self->channel);
+ Py_XDECREF(self->server);
+ grpc_call_details_destroy(&self->call_details);
+ grpc_metadata_array_destroy(&self->recv_metadata);
+ grpc_metadata_array_destroy(&self->recv_trailing_metadata);
+ if (self->send_message) {
+ grpc_byte_buffer_destroy(self->send_message);
+ }
+ if (self->recv_message) {
+ grpc_byte_buffer_destroy(self->recv_message);
+ }
+ gpr_free(self->status_details);
+ gpr_free(self->send_metadata);
+ gpr_free(self->send_trailing_metadata);
self->ob_type->tp_free((PyObject *)self);
}
static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) {
- const PyObject *completion_queue;
- const PyObject *metadata_tag;
- const PyObject *finish_tag;
+ PyObject *completion_queue;
+ PyObject *metadata_tag;
+ PyObject *finish_tag;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_init_metadata_tag;
+ pygrpc_tag *c_metadata_tag;
+ pygrpc_tag *c_finish_tag;
+ grpc_op send_initial_metadata;
+ grpc_op recv_initial_metadata;
+ grpc_op recv_status_on_client;
if (!(PyArg_ParseTuple(args, "O!OO:invoke", &pygrpc_CompletionQueueType,
&completion_queue, &metadata_tag, &finish_tag))) {
return NULL;
}
-
- call_error = grpc_call_invoke_old(
- self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
- (void *)metadata_tag, (void *)finish_tag, 0);
-
+ send_initial_metadata.op = GRPC_OP_SEND_INITIAL_METADATA;
+ send_initial_metadata.data.send_initial_metadata.metadata = self->send_metadata;
+ send_initial_metadata.data.send_initial_metadata.count = self->send_metadata_count;
+ recv_initial_metadata.op = GRPC_OP_RECV_INITIAL_METADATA;
+ recv_initial_metadata.data.recv_initial_metadata = &self->recv_metadata;
+ recv_status_on_client.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ recv_status_on_client.data.recv_status_on_client.trailing_metadata = &self->recv_trailing_metadata;
+ recv_status_on_client.data.recv_status_on_client.status = &self->status;
+ recv_status_on_client.data.recv_status_on_client.status_details = &self->status_details;
+ recv_status_on_client.data.recv_status_on_client.status_details_capacity = &self->status_details_capacity;
+ c_init_metadata_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
+ c_metadata_tag = pygrpc_tag_new(PYGRPC_CLIENT_METADATA_READ, metadata_tag, self);
+ c_finish_tag = pygrpc_tag_new(PYGRPC_FINISHED_CLIENT, finish_tag, self);
+
+ call_error = grpc_call_start_batch(self->c_call, &send_initial_metadata, 1, c_init_metadata_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_init_metadata_tag);
+ pygrpc_tag_destroy(c_metadata_tag);
+ pygrpc_tag_destroy(c_finish_tag);
+ return result;
+ }
+ call_error = grpc_call_start_batch(self->c_call, &recv_initial_metadata, 1, c_metadata_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_metadata_tag);
+ pygrpc_tag_destroy(c_finish_tag);
+ return result;
+ }
+ call_error = grpc_call_start_batch(self->c_call, &recv_status_on_client, 1, c_finish_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(metadata_tag);
- Py_INCREF(finish_tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_finish_tag);
+ return result;
}
+
return result;
}
static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
const char *bytes;
int length;
- const PyObject *tag;
+ PyObject *tag;
gpr_slice slice;
grpc_byte_buffer *byte_buffer;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag;
+ grpc_op op;
if (!(PyArg_ParseTuple(args, "s#O:write", &bytes, &length, &tag))) {
return NULL;
}
+ c_tag = pygrpc_tag_new(PYGRPC_WRITE_ACCEPTED, tag, self);
slice = gpr_slice_from_copied_buffer(bytes, length);
byte_buffer = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
- call_error =
- grpc_call_start_write_old(self->c_call, byte_buffer, (void *)tag, 0);
+ if (self->send_message) {
+ grpc_byte_buffer_destroy(self->send_message);
+ }
+ self->send_message = byte_buffer;
+
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message = self->send_message;
- grpc_byte_buffer_destroy(byte_buffer);
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
@@ -127,36 +203,42 @@ static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
static const PyObject *pygrpc_call_complete(Call *self, PyObject *tag) {
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
+ grpc_op op;
- call_error = grpc_call_writes_done_old(self->c_call, (void *)tag);
+ op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) {
- const PyObject *completion_queue;
- const PyObject *tag;
+ PyObject *completion_queue;
+ PyObject *tag;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag;
+ grpc_op op;
if (!(PyArg_ParseTuple(args, "O!O:accept", &pygrpc_CompletionQueueType,
&completion_queue, &tag))) {
return NULL;
}
- call_error = grpc_call_server_accept_old(
- self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
- (void *)tag);
- result = pygrpc_translate_call_error(call_error);
+ op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op.data.recv_close_on_server.cancelled = &self->cancelled;
+ c_tag = pygrpc_tag_new(PYGRPC_FINISHED_SERVER, tag, self);
- if (result != NULL) {
- Py_INCREF(tag);
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
-
return result;
}
@@ -171,24 +253,52 @@ static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) {
metadata.key = key;
metadata.value = value;
metadata.value_length = value_length;
- return pygrpc_translate_call_error(
- grpc_call_add_metadata_old(self->c_call, &metadata, 0));
+ if (self->adding_to_trailing) {
+ self->send_trailing_metadata = gpr_realloc(self->send_trailing_metadata, (self->send_trailing_metadata_count + 1) * sizeof(grpc_metadata));
+ self->send_trailing_metadata[self->send_trailing_metadata_count] = metadata;
+ self->send_trailing_metadata_count = self->send_trailing_metadata_count + 1;
+ } else {
+ self->send_metadata = gpr_realloc(self->send_metadata, (self->send_metadata_count + 1) * sizeof(grpc_metadata));
+ self->send_metadata[self->send_metadata_count] = metadata;
+ self->send_metadata_count = self->send_metadata_count + 1;
+ }
+ return pygrpc_translate_call_error(GRPC_CALL_OK);
}
static const PyObject *pygrpc_call_premetadata(Call *self) {
- return pygrpc_translate_call_error(
- grpc_call_server_end_initial_metadata_old(self->c_call, 0));
+ grpc_op op;
+ grpc_call_error call_error;
+ const PyObject *result;
+ pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
+ op.op = GRPC_OP_SEND_INITIAL_METADATA;
+ op.data.send_initial_metadata.metadata = self->send_metadata;
+ op.data.send_initial_metadata.count = self->send_metadata_count;
+ self->adding_to_trailing = 1;
+
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
+ }
+ return result;
}
static const PyObject *pygrpc_call_read(Call *self, PyObject *tag) {
+ grpc_op op;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_READ, tag, self);
- call_error = grpc_call_start_read_old(self->c_call, (void *)tag);
-
+ op.op = GRPC_OP_RECV_MESSAGE;
+ if (self->recv_message) {
+ grpc_byte_buffer_destroy(self->recv_message);
+ self->recv_message = NULL;
+ }
+ op.data.recv_message = &self->recv_message;
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
@@ -197,15 +307,18 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
PyObject *status;
PyObject *code;
PyObject *details;
- const PyObject *tag;
+ PyObject *tag;
grpc_status_code c_code;
char *c_message;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag;
+ grpc_op op;
if (!(PyArg_ParseTuple(args, "OO:status", &status, &tag))) {
return NULL;
}
+ c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
code = PyObject_GetAttrString(status, "code");
if (code == NULL) {
@@ -227,13 +340,21 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
if (c_message == NULL) {
return NULL;
}
-
- call_error = grpc_call_start_write_status_old(self->c_call, c_code, c_message,
- (void *)tag);
-
+ if (self->status_details) {
+ gpr_free(self->status_details);
+ }
+ self->status_details = gpr_malloc(strlen(c_message)+1);
+ strcpy(self->status_details, c_message);
+ op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op.data.send_status_from_server.trailing_metadata_count = self->send_trailing_metadata_count;
+ op.data.send_status_from_server.trailing_metadata = self->send_trailing_metadata;
+ op.data.send_status_from_server.status = c_code;
+ op.data.send_status_from_server.status_details = self->status_details;
+
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
@@ -301,9 +422,9 @@ PyTypeObject pygrpc_CallType = {
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
- (initproc)pygrpc_call_init, /* tp_init */
+ 0, /* tp_init */
0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
+ pygrpc_call_new, /* tp_new */
};
int pygrpc_add_call(PyObject *module) {
diff --git a/src/python/src/grpc/_adapter/_call.h b/src/python/src/grpc/_adapter/_call.h
index c04a2285f7..fb9160901b 100644
--- a/src/python/src/grpc/_adapter/_call.h
+++ b/src/python/src/grpc/_adapter/_call.h
@@ -37,8 +37,36 @@
#include <Python.h>
#include <grpc/grpc.h>
+#include "grpc/_adapter/_completion_queue.h"
+#include "grpc/_adapter/_channel.h"
+#include "grpc/_adapter/_server.h"
+
typedef struct {
PyObject_HEAD
+
+ CompletionQueue *completion_queue;
+ Channel *channel;
+ Server *server;
+
+ /* Legacy state. */
+ grpc_call_details call_details;
+ grpc_metadata_array recv_metadata;
+ grpc_metadata_array recv_trailing_metadata;
+ grpc_metadata *send_metadata;
+ size_t send_metadata_count;
+ grpc_metadata *send_trailing_metadata;
+ size_t send_trailing_metadata_count;
+ int adding_to_trailing;
+
+ grpc_byte_buffer *send_message;
+ grpc_byte_buffer *recv_message;
+
+ grpc_status_code status;
+ char *status_details;
+ size_t status_details_capacity;
+
+ int cancelled;
+
grpc_call *c_call;
} Call;
diff --git a/src/python/src/grpc/_adapter/_completion_queue.c b/src/python/src/grpc/_adapter/_completion_queue.c
index a639eff53e..5f1cb2a3a6 100644
--- a/src/python/src/grpc/_adapter/_completion_queue.c
+++ b/src/python/src/grpc/_adapter/_completion_queue.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include "grpc/_adapter/_call.h"
+#include "grpc/_adapter/_tag.h"
static PyObject *status_class;
static PyObject *service_acceptance_class;
@@ -138,74 +139,70 @@ static PyObject *pygrpc_stop_event_args(grpc_event *c_event) {
}
static PyObject *pygrpc_write_event_args(grpc_event *c_event) {
- PyObject *write_accepted =
- c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False;
- return PyTuple_Pack(8, write_event_kind, (PyObject *)c_event->tag,
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ PyObject *write_accepted = Py_True;
+ return PyTuple_Pack(8, write_event_kind, user_tag,
write_accepted, Py_None, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_complete_event_args(grpc_event *c_event) {
- PyObject *complete_accepted =
- c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False;
- return PyTuple_Pack(8, complete_event_kind, (PyObject *)c_event->tag,
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ PyObject *complete_accepted = Py_True;
+ return PyTuple_Pack(8, complete_event_kind, user_tag,
Py_None, complete_accepted, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_service_event_args(grpc_event *c_event) {
- if (c_event->data.server_rpc_new.method == NULL) {
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ if (tag->call->call_details.method == NULL) {
return PyTuple_Pack(
- 8, service_event_kind, c_event->tag, Py_None, Py_None, Py_None, Py_None,
+ 8, service_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None,
Py_None, Py_None);
} else {
PyObject *method = NULL;
PyObject *host = NULL;
PyObject *service_deadline = NULL;
- Call *call = NULL;
PyObject *service_acceptance = NULL;
PyObject *metadata = NULL;
PyObject *event_args = NULL;
- method = PyBytes_FromString(c_event->data.server_rpc_new.method);
+ method = PyBytes_FromString(tag->call->call_details.method);
if (method == NULL) {
goto error;
}
- host = PyBytes_FromString(c_event->data.server_rpc_new.host);
+ host = PyBytes_FromString(tag->call->call_details.host);
if (host == NULL) {
goto error;
}
service_deadline =
- pygrpc_as_py_time(&c_event->data.server_rpc_new.deadline);
+ pygrpc_as_py_time(&tag->call->call_details.deadline);
if (service_deadline == NULL) {
goto error;
}
- call = PyObject_New(Call, &pygrpc_CallType);
- if (call == NULL) {
- goto error;
- }
- call->c_call = c_event->call;
-
service_acceptance =
- PyObject_CallFunctionObjArgs(service_acceptance_class, call, method,
- host, service_deadline, NULL);
+ PyObject_CallFunctionObjArgs(service_acceptance_class, tag->call,
+ method, host, service_deadline, NULL);
if (service_acceptance == NULL) {
goto error;
}
metadata = pygrpc_metadata_collection_get(
- c_event->data.server_rpc_new.metadata_elements,
- c_event->data.server_rpc_new.metadata_count);
+ tag->call->recv_metadata.metadata,
+ tag->call->recv_metadata.count);
event_args = PyTuple_Pack(8, service_event_kind,
- (PyObject *)c_event->tag, Py_None, Py_None,
+ user_tag, Py_None, Py_None,
service_acceptance, Py_None, Py_None,
metadata);
Py_DECREF(service_acceptance);
Py_DECREF(metadata);
error:
- Py_XDECREF(call);
Py_XDECREF(method);
Py_XDECREF(host);
Py_XDECREF(service_deadline);
@@ -215,8 +212,10 @@ error:
}
static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
- if (c_event->data.read == NULL) {
- return PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ if (tag->call->recv_message == NULL) {
+ return PyTuple_Pack(8, read_event_kind, user_tag,
Py_None, Py_None, Py_None, Py_None, Py_None, Py_None);
} else {
size_t length;
@@ -227,8 +226,8 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
PyObject *bytes;
PyObject *event_args;
- length = grpc_byte_buffer_length(c_event->data.read);
- reader = grpc_byte_buffer_reader_create(c_event->data.read);
+ length = grpc_byte_buffer_length(tag->call->recv_message);
+ reader = grpc_byte_buffer_reader_create(tag->call->recv_message);
c_bytes = gpr_malloc(length);
offset = 0;
while (grpc_byte_buffer_reader_next(reader, &slice)) {
@@ -242,7 +241,7 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
if (bytes == NULL) {
return NULL;
}
- event_args = PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
+ event_args = PyTuple_Pack(8, read_event_kind, user_tag,
Py_None, Py_None, Py_None, bytes, Py_None,
Py_None);
Py_DECREF(bytes);
@@ -251,32 +250,65 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
}
static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) {
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
PyObject *metadata = pygrpc_metadata_collection_get(
- c_event->data.client_metadata_read.elements,
- c_event->data.client_metadata_read.count);
+ tag->call->recv_metadata.metadata,
+ tag->call->recv_metadata.count);
PyObject* result = PyTuple_Pack(
- 8, metadata_event_kind, (PyObject *)c_event->tag, Py_None, Py_None,
+ 8, metadata_event_kind, user_tag, Py_None, Py_None,
Py_None, Py_None, Py_None, metadata);
Py_DECREF(metadata);
return result;
}
-static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
+static PyObject *pygrpc_finished_server_event_args(grpc_event *c_event) {
+ PyObject *code;
+ PyObject *details;
+ PyObject *status;
+ PyObject *event_args;
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+
+ code = pygrpc_status_code(tag->call->cancelled ? GRPC_STATUS_CANCELLED : GRPC_STATUS_OK);
+ if (code == NULL) {
+ PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
+ return NULL;
+ }
+ details = PyBytes_FromString("");
+ if (details == NULL) {
+ return NULL;
+ }
+ status = PyObject_CallFunctionObjArgs(status_class, code, details, NULL);
+ Py_DECREF(details);
+ if (status == NULL) {
+ return NULL;
+ }
+ event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
+ Py_None, Py_None, Py_None, Py_None, status,
+ Py_None);
+ Py_DECREF(status);
+ return event_args;
+}
+
+static PyObject *pygrpc_finished_client_event_args(grpc_event *c_event) {
PyObject *code;
PyObject *details;
PyObject *status;
PyObject *event_args;
PyObject *metadata;
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
- code = pygrpc_status_code(c_event->data.finished.status);
+ code = pygrpc_status_code(tag->call->status);
if (code == NULL) {
PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
return NULL;
}
- if (c_event->data.finished.details == NULL) {
+ if (tag->call->status_details == NULL) {
details = PyBytes_FromString("");
} else {
- details = PyBytes_FromString(c_event->data.finished.details);
+ details = PyBytes_FromString(tag->call->status_details);
}
if (details == NULL) {
return NULL;
@@ -287,9 +319,9 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
return NULL;
}
metadata = pygrpc_metadata_collection_get(
- c_event->data.finished.metadata_elements,
- c_event->data.finished.metadata_count);
- event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag,
+ tag->call->recv_trailing_metadata.metadata,
+ tag->call->recv_trailing_metadata.count);
+ event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
Py_None, Py_None, Py_None, Py_None, status,
metadata);
Py_DECREF(status);
@@ -348,28 +380,51 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
Py_RETURN_NONE;
}
+ pygrpc_tag *tag = (pygrpc_tag *)c_event->tag;
+
switch (c_event->type) {
case GRPC_QUEUE_SHUTDOWN:
event_args = pygrpc_stop_event_args(c_event);
break;
- case GRPC_WRITE_ACCEPTED:
- event_args = pygrpc_write_event_args(c_event);
- break;
- case GRPC_FINISH_ACCEPTED:
- event_args = pygrpc_complete_event_args(c_event);
- break;
- case GRPC_SERVER_RPC_NEW:
- event_args = pygrpc_service_event_args(c_event);
- break;
- case GRPC_READ:
- event_args = pygrpc_read_event_args(c_event);
- break;
- case GRPC_CLIENT_METADATA_READ:
- event_args = pygrpc_metadata_event_args(c_event);
- break;
- case GRPC_FINISHED:
- event_args = pygrpc_finished_event_args(c_event);
+ case GRPC_OP_COMPLETE: {
+ if (!tag) {
+ PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
+ return NULL;
+ }
+ switch (tag->type) {
+ case PYGRPC_INITIAL_METADATA:
+ if (tag) {
+ pygrpc_tag_destroy(tag);
+ }
+ grpc_event_finish(c_event);
+ return pygrpc_completion_queue_get(self, args);
+ case PYGRPC_WRITE_ACCEPTED:
+ event_args = pygrpc_write_event_args(c_event);
+ break;
+ case PYGRPC_FINISH_ACCEPTED:
+ event_args = pygrpc_complete_event_args(c_event);
+ break;
+ case PYGRPC_SERVER_RPC_NEW:
+ event_args = pygrpc_service_event_args(c_event);
+ break;
+ case PYGRPC_READ:
+ event_args = pygrpc_read_event_args(c_event);
+ break;
+ case PYGRPC_CLIENT_METADATA_READ:
+ event_args = pygrpc_metadata_event_args(c_event);
+ break;
+ case PYGRPC_FINISHED_CLIENT:
+ event_args = pygrpc_finished_client_event_args(c_event);
+ break;
+ case PYGRPC_FINISHED_SERVER:
+ event_args = pygrpc_finished_server_event_args(c_event);
+ break;
+ default:
+ PyErr_SetString(PyExc_Exception, "Unrecognized op event type!");
+ return NULL;
+ }
break;
+ }
default:
PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
return NULL;
@@ -382,7 +437,9 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
event = PyObject_CallObject(event_class, event_args);
Py_DECREF(event_args);
- Py_XDECREF((PyObject *)c_event->tag);
+ if (tag) {
+ pygrpc_tag_destroy(tag);
+ }
grpc_event_finish(c_event);
return event;
diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py
index 826c586a1e..09c4660a2b 100644
--- a/src/python/src/grpc/_adapter/_low_test.py
+++ b/src/python/src/grpc/_adapter/_low_test.py
@@ -56,7 +56,7 @@ class LonelyClientTest(unittest.TestCase):
completion_queue = _low.CompletionQueue()
channel = _low.Channel('%s:%d' % (host, port), None)
- client_call = _low.Call(channel, method, host, deadline)
+ client_call = _low.Call(channel, completion_queue, method, host, deadline)
client_call.invoke(completion_queue, metadata_tag, finish_tag)
first_event = completion_queue.get(after_deadline)
@@ -138,7 +138,8 @@ class EchoTest(unittest.TestCase):
server_data = []
client_data = []
- client_call = _low.Call(self.channel, method, self.host, deadline)
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
client_call.add_metadata(client_metadata_key, client_metadata_value)
client_call.add_metadata(client_binary_metadata_key,
client_binary_metadata_value)
@@ -335,7 +336,8 @@ class CancellationTest(unittest.TestCase):
server_data = []
client_data = []
- client_call = _low.Call(self.channel, method, self.host, deadline)
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
diff --git a/src/python/src/grpc/_adapter/_server.c b/src/python/src/grpc/_adapter/_server.c
index 181b6c21fc..e7c5917724 100644
--- a/src/python/src/grpc/_adapter/_server.c
+++ b/src/python/src/grpc/_adapter/_server.c
@@ -36,12 +36,14 @@
#include <Python.h>
#include <grpc/grpc.h>
+#include "grpc/_adapter/_call.h"
#include "grpc/_adapter/_completion_queue.h"
#include "grpc/_adapter/_error.h"
#include "grpc/_adapter/_server_credentials.h"
+#include "grpc/_adapter/_tag.h"
static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
- const PyObject *completion_queue;
+ CompletionQueue *completion_queue;
static char *kwlist[] = {"completion_queue", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!:Server", kwlist,
@@ -50,7 +52,9 @@ static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
return -1;
}
self->c_server = grpc_server_create(
- ((CompletionQueue *)completion_queue)->c_completion_queue, NULL);
+ completion_queue->c_completion_queue, NULL);
+ self->completion_queue = completion_queue;
+ Py_INCREF(completion_queue);
return 0;
}
@@ -58,6 +62,7 @@ static void pygrpc_server_dealloc(Server *self) {
if (self->c_server != NULL) {
grpc_server_destroy(self->c_server);
}
+ Py_XDECREF(self->completion_queue);
self->ob_type->tp_free((PyObject *)self);
}
@@ -109,8 +114,15 @@ static PyObject *pygrpc_server_start(Server *self) {
static const PyObject *pygrpc_server_service(Server *self, PyObject *tag) {
grpc_call_error call_error;
const PyObject *result;
-
- call_error = grpc_server_request_call_old(self->c_server, (void *)tag);
+ pygrpc_tag *c_tag = pygrpc_tag_new_server_rpc_call(tag);
+ c_tag->call->completion_queue = self->completion_queue;
+ c_tag->call->server = self;
+ Py_INCREF(c_tag->call->completion_queue);
+ Py_INCREF(c_tag->call->server);
+ call_error = grpc_server_request_call(
+ self->c_server, &c_tag->call->c_call, &c_tag->call->call_details,
+ &c_tag->call->recv_metadata, self->completion_queue->c_completion_queue,
+ c_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
diff --git a/src/python/src/grpc/_adapter/_server.h b/src/python/src/grpc/_adapter/_server.h
index 4836bb638c..d31d4e678b 100644
--- a/src/python/src/grpc/_adapter/_server.h
+++ b/src/python/src/grpc/_adapter/_server.h
@@ -37,8 +37,12 @@
#include <Python.h>
#include <grpc/grpc.h>
+#include "grpc/_adapter/_completion_queue.h"
+
typedef struct {
PyObject_HEAD
+
+ CompletionQueue *completion_queue;
grpc_server *c_server;
} Server;
diff --git a/src/python/src/grpc/_adapter/_tag.c b/src/python/src/grpc/_adapter/_tag.c
new file mode 100644
index 0000000000..9c6ee19d79
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_tag.c
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_tag.h"
+
+#include <Python.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
+ Call *call) {
+ pygrpc_tag *self = (pygrpc_tag *)gpr_malloc(sizeof(pygrpc_tag));
+ memset(self, 0, sizeof(pygrpc_tag));
+ if (user_tag == NULL) {
+ self->user_tag = Py_None;
+ } else {
+ self->user_tag = user_tag;
+ }
+ Py_INCREF(self->user_tag);
+ self->type = type;
+ self->call = call;
+ Py_INCREF(call);
+ return self;
+}
+
+pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag) {
+ return pygrpc_tag_new(PYGRPC_SERVER_RPC_NEW, user_tag,
+ (Call *)pygrpc_CallType.tp_alloc(&pygrpc_CallType, 0));
+}
+
+void pygrpc_tag_destroy(pygrpc_tag *self) {
+ Py_XDECREF(self->user_tag);
+ Py_XDECREF(self->call);
+ gpr_free(self);
+}
diff --git a/src/python/src/grpc/_adapter/_tag.h b/src/python/src/grpc/_adapter/_tag.h
new file mode 100644
index 0000000000..82987ea102
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_tag.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef _ADAPTER__TAG_H_
+#define _ADAPTER__TAG_H_
+
+#include <Python.h>
+#include <grpc/grpc.h>
+
+#include "grpc/_adapter/_call.h"
+#include "grpc/_adapter/_completion_queue.h"
+
+/* grpc_completion_type is becoming meaningless in grpc_event; this is a partial
+ replacement for its descriptive functionality until Python can move its whole
+ C and C adapter stack to more closely resemble the core batching API. */
+typedef enum {
+ PYGRPC_SERVER_RPC_NEW = 0,
+ PYGRPC_INITIAL_METADATA = 1,
+ PYGRPC_READ = 2,
+ PYGRPC_WRITE_ACCEPTED = 3,
+ PYGRPC_FINISH_ACCEPTED = 4,
+ PYGRPC_CLIENT_METADATA_READ = 5,
+ PYGRPC_FINISHED_CLIENT = 6,
+ PYGRPC_FINISHED_SERVER = 7,
+} pygrpc_tag_type;
+
+typedef struct {
+ pygrpc_tag_type type;
+ PyObject *user_tag;
+
+ Call *call;
+} pygrpc_tag;
+
+pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
+ Call *call);
+pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag);
+void pygrpc_tag_destroy(pygrpc_tag *self);
+
+#endif /* _ADAPTER__TAG_H_ */
+
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index 2b93aa6331..dd0a486117 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -246,7 +246,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
timeout: A duration of time in seconds to allow for the RPC.
"""
request_serializer = self._request_serializers[name]
- call = _low.Call(self._channel, name, self._host, time.time() + timeout)
+ call = _low.Call(self._channel, self._completion_queue, name, self._host, time.time() + timeout)
if self._metadata_transformer is not None:
metadata = self._metadata_transformer([])
for metadata_key, metadata_value in metadata:
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index 32ac41e285..7149a9eb85 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -42,6 +42,7 @@ _EXTENSION_SOURCES = (
'grpc/_adapter/_server.c',
'grpc/_adapter/_client_credentials.c',
'grpc/_adapter/_server_credentials.c',
+ 'grpc/_adapter/_tag.c'
)
_EXTENSION_INCLUDE_DIRECTORIES = (