diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/iomgr/iomgr.c | 8 | ||||
-rw-r--r-- | src/core/support/cpu_linux.c | 35 | ||||
-rw-r--r-- | src/core/support/sync_posix.c | 11 | ||||
-rw-r--r-- | src/core/support/thd_posix.c | 8 | ||||
-rw-r--r-- | src/core/support/thd_win32.c | 8 | ||||
-rw-r--r-- | src/core/support/time.c | 16 | ||||
-rw-r--r-- | src/core/support/time_posix.c | 25 | ||||
-rw-r--r-- | src/core/surface/call.c | 1 | ||||
-rw-r--r-- | src/core/tsi/ssl_transport_security.c | 22 | ||||
-rw-r--r-- | src/cpp/server/async_server_context.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/server_rpc_handler.cc | 2 | ||||
-rw-r--r-- | src/csharp/.gitignore | 2 | ||||
-rwxr-xr-x | src/csharp/README.md | 22 | ||||
-rw-r--r-- | src/csharp/ext/grpc_csharp_ext.c | 113 | ||||
-rw-r--r-- | src/node/ext/timeval.cc | 5 | ||||
-rw-r--r-- | src/php/ext/grpc/call.c | 114 | ||||
-rw-r--r-- | src/php/ext/grpc/call.h | 1 | ||||
-rw-r--r-- | src/php/ext/grpc/server.c | 4 | ||||
-rwxr-xr-x | src/php/tests/interop/interop_client.php | 21 | ||||
-rwxr-xr-x | src/php/tests/unit_tests/CallTest.php | 9 | ||||
-rwxr-xr-x | src/php/tests/unit_tests/EndToEndTest.php | 6 | ||||
-rwxr-xr-x | src/php/tests/unit_tests/SecureEndToEndTest.php | 5 | ||||
-rwxr-xr-x | src/php/tests/util/port_picker.php | 6 |
23 files changed, 306 insertions, 140 deletions
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 8989b491d5..3d6114ca18 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -51,6 +51,7 @@ typedef struct delayed_callback { static gpr_mu g_mu; static gpr_cv g_cv; +static gpr_cv g_rcv; static delayed_callback *g_cbs_head = NULL; static delayed_callback *g_cbs_tail = NULL; static int g_shutdown; @@ -86,6 +87,7 @@ void grpc_iomgr_init(void) { gpr_thd_id id; gpr_mu_init(&g_mu); gpr_cv_init(&g_cv); + gpr_cv_init(&g_rcv); grpc_alarm_list_init(gpr_now()); g_refs = 0; grpc_iomgr_platform_init(); @@ -115,7 +117,7 @@ void grpc_iomgr_shutdown(void) { gpr_mu_lock(&g_mu); } if (g_refs) { - if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) { + if (gpr_cv_wait(&g_rcv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) { gpr_log(GPR_DEBUG, "Failed to free %d iomgr objects before shutdown deadline: " "memory leaks are likely", @@ -126,12 +128,14 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); + grpc_kick_poller(); gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); grpc_iomgr_platform_shutdown(); grpc_alarm_list_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_cv); + gpr_cv_destroy(&g_rcv); } void grpc_iomgr_ref(void) { @@ -143,7 +147,7 @@ void grpc_iomgr_ref(void) { void grpc_iomgr_unref(void) { gpr_mu_lock(&g_mu); if (0 == --g_refs) { - gpr_cv_signal(&g_cv); + gpr_cv_signal(&g_rcv); } gpr_mu_unlock(&g_mu); } diff --git a/src/core/support/cpu_linux.c b/src/core/support/cpu_linux.c index eab8b7fbd0..ad82174894 100644 --- a/src/core/support/cpu_linux.c +++ b/src/core/support/cpu_linux.c @@ -31,44 +31,17 @@ * */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif /* _GNU_SOURCE */ + #include <grpc/support/port_platform.h> #ifdef GPR_CPU_LINUX #include "src/core/support/cpu.h" -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#define GRPC_GNU_SOURCE -#endif - -#ifndef __USE_GNU -#define __USE_GNU -#define GRPC_USE_GNU -#endif - -#ifndef __USE_MISC -#define __USE_MISC -#define GRPC_USE_MISC -#endif - #include <sched.h> - -#ifdef GRPC_GNU_SOURCE -#undef _GNU_SOURCE -#undef GRPC_GNU_SOURCE -#endif - -#ifdef GRPC_USE_GNU -#undef __USE_GNU -#undef GRPC_USE_GNU -#endif - -#ifdef GRPC_USE_MISC -#undef __USE_MISC -#undef GRPC_USE_MISC -#endif - #include <errno.h> #include <unistd.h> #include <string.h> diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index 7f0e4a95a4..a28a4c6bf4 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -33,11 +33,17 @@ /* Posix gpr synchroization support code. */ +#if !defined _POSIX_C_SOURCE || _POSIX_C_SOURCE < 199309L +#undef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 199309L +#endif + #include <grpc/support/port_platform.h> #ifdef GPR_POSIX_SYNC #include <errno.h> +#include <time.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -67,7 +73,10 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) { err = pthread_cond_wait(cv, mu); } else { - err = pthread_cond_timedwait(cv, mu, &abs_deadline); + struct timespec abs_deadline_ts; + abs_deadline_ts.tv_sec = abs_deadline.tv_sec; + abs_deadline_ts.tv_nsec = abs_deadline.tv_nsec; + err = pthread_cond_timedwait(cv, mu, &abs_deadline_ts); } GPR_ASSERT(err == 0 || err == ETIMEDOUT || err == EAGAIN); return err == ETIMEDOUT; diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c index bac1d9c220..74ca9424bb 100644 --- a/src/core/support/thd_posix.c +++ b/src/core/support/thd_posix.c @@ -62,17 +62,19 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, const gpr_thd_options *options) { int thread_started; pthread_attr_t attr; + pthread_t p; struct thd_arg *a = gpr_malloc(sizeof(*a)); a->body = thd_body; a->arg = arg; GPR_ASSERT(pthread_attr_init(&attr) == 0); GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0); - thread_started = (pthread_create(t, &attr, &thread_body, a) == 0); + thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0); GPR_ASSERT(pthread_attr_destroy(&attr) == 0); if (!thread_started) { gpr_free(a); } + *t = (gpr_thd_id)p; return thread_started; } @@ -82,4 +84,8 @@ gpr_thd_options gpr_thd_options_default(void) { return options; } +gpr_thd_id gpr_thd_currentid(void) { + return (gpr_thd_id)pthread_self(); +} + #endif /* GPR_POSIX_SYNC */ diff --git a/src/core/support/thd_win32.c b/src/core/support/thd_win32.c index 1762f87f3c..2ee1417048 100644 --- a/src/core/support/thd_win32.c +++ b/src/core/support/thd_win32.c @@ -58,16 +58,18 @@ static DWORD WINAPI thread_body(void *v) { int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, const gpr_thd_options *options) { HANDLE handle; + DWORD thread_id; struct thd_arg *a = gpr_malloc(sizeof(*a)); a->body = thd_body; a->arg = arg; *t = 0; - handle = CreateThread(NULL, 64 * 1024, thread_body, a, 0, NULL); + handle = CreateThread(NULL, 64 * 1024, thread_body, a, 0, &thread_id); if (handle == NULL) { gpr_free(a); } else { CloseHandle(handle); /* threads are "detached" */ } + *t = (gpr_thd_id)thread_id; return handle != NULL; } @@ -77,4 +79,8 @@ gpr_thd_options gpr_thd_options_default(void) { return options; } +gpr_thd_id gpr_thd_currentid(void) { + return (gpr_thd_id)GetCurrentThreadId(); +} + #endif /* GPR_WIN32 */ diff --git a/src/core/support/time.c b/src/core/support/time.c index 97243318fd..268a43c677 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -234,22 +234,6 @@ int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold) { } } -struct timeval gpr_timeval_from_timespec(gpr_timespec t) { - /* TODO(klempner): Consider whether this should round up, since it is likely - to be used for delays */ - struct timeval tv; - tv.tv_sec = t.tv_sec; - tv.tv_usec = t.tv_nsec / 1000; - return tv; -} - -gpr_timespec gpr_timespec_from_timeval(struct timeval t) { - gpr_timespec ts; - ts.tv_sec = t.tv_sec; - ts.tv_nsec = t.tv_usec * 1000; - return ts; -} - gpr_int32 gpr_time_to_millis(gpr_timespec t) { if (t.tv_sec >= 2147483) { if (t.tv_sec == 2147483 && t.tv_nsec < 648 * GPR_NS_PER_MS) { diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index 9e11f8a865..7f0f028183 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -34,7 +34,8 @@ /* Posix code for gpr time support. */ /* So we get nanosleep and clock_* */ -#ifndef _POSIX_C_SOURCE +#if !defined _POSIX_C_SOURCE || _POSIX_C_SOURCE < 199309L +#undef _POSIX_C_SOURCE #define _POSIX_C_SOURCE 199309L #endif @@ -47,11 +48,25 @@ #include <unistd.h> #include <grpc/support/time.h> +static struct timespec timespec_from_gpr(gpr_timespec gts) { + struct timespec rv; + rv.tv_sec = gts.tv_sec; + rv.tv_nsec = gts.tv_nsec; + return rv; +} + #if _POSIX_TIMERS > 0 +static gpr_timespec gpr_from_timespec(struct timespec ts) { + gpr_timespec rv; + rv.tv_sec = ts.tv_sec; + rv.tv_nsec = ts.tv_nsec; + return rv; +} + gpr_timespec gpr_now(void) { - gpr_timespec now; + struct timespec now; clock_gettime(CLOCK_REALTIME, &now); - return now; + return gpr_from_timespec(now); } #else /* For some reason Apple's OSes haven't implemented clock_gettime. */ @@ -69,6 +84,7 @@ gpr_timespec gpr_now(void) { void gpr_sleep_until(gpr_timespec until) { gpr_timespec now; gpr_timespec delta; + struct timespec delta_ts; for (;;) { /* We could simplify by using clock_nanosleep instead, but it might be @@ -79,7 +95,8 @@ void gpr_sleep_until(gpr_timespec until) { } delta = gpr_time_sub(until, now); - if (nanosleep(&delta, NULL) == 0) { + delta_ts = timespec_from_gpr(delta); + if (nanosleep(&delta_ts, NULL) == 0) { break; } } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5a24264cce..2b6f042eb9 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -318,6 +318,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, maybe_set_status_code(c, status); if (details) { maybe_set_status_details(c, details); + grpc_mdstr_unref(details); } gpr_mu_unlock(&c->read_mu); return grpc_call_cancel(c); diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 0f8cbccb62..e23421fc15 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -37,6 +37,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include <grpc/support/useful.h> #include "src/core/tsi/transport_security.h" @@ -103,11 +104,32 @@ typedef struct { /* --- Library Initialization. ---*/ static gpr_once init_openssl_once = GPR_ONCE_INIT; +static gpr_mu *openssl_mutexes = NULL; + +static void openssl_locking_cb(int mode, int type, const char* file, int line) { + if (mode & CRYPTO_LOCK) { + gpr_mu_lock(&openssl_mutexes[type]); + } else { + gpr_mu_unlock(&openssl_mutexes[type]); + } +} + +static unsigned long openssl_thread_id_cb(void) { + return (unsigned long)gpr_thd_currentid(); +} static void init_openssl(void) { + int i; SSL_library_init(); SSL_load_error_strings(); OpenSSL_add_all_algorithms(); + openssl_mutexes = malloc(CRYPTO_num_locks() * sizeof(gpr_mu)); + GPR_ASSERT(openssl_mutexes != NULL); + for (i = 0; i < CRYPTO_num_locks(); i++) { + gpr_mu_init(&openssl_mutexes[i]); + } + CRYPTO_set_locking_callback(openssl_locking_cb); + CRYPTO_set_id_callback(openssl_thread_id_cb); } /* --- Ssl utils. ---*/ diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc index 2dd3cd1e8e..886e794137 100644 --- a/src/cpp/server/async_server_context.cc +++ b/src/cpp/server/async_server_context.cc @@ -54,7 +54,7 @@ AsyncServerContext::~AsyncServerContext() { grpc_call_destroy(call_); } void AsyncServerContext::Accept(grpc_completion_queue *cq) { GPR_ASSERT(grpc_call_server_accept_old(call_, cq, this) == GRPC_CALL_OK); - GPR_ASSERT(grpc_call_server_end_initial_metadata_old(call_, 0) == + GPR_ASSERT(grpc_call_server_end_initial_metadata_old(call_, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); } diff --git a/src/cpp/server/server_rpc_handler.cc b/src/cpp/server/server_rpc_handler.cc index 061ac1c2f3..bf02de8b80 100644 --- a/src/cpp/server/server_rpc_handler.cc +++ b/src/cpp/server/server_rpc_handler.cc @@ -77,7 +77,7 @@ void ServerRpcHandler::StartRpc() { if (status.IsOk()) { // Send the response if we get an ok status. - async_server_context_->StartWrite(*response, 0); + async_server_context_->StartWrite(*response, GRPC_WRITE_BUFFER_HINT); type = WaitForNextEvent(); if (type != CompletionQueue::SERVER_WRITE_OK) { status = Status(StatusCode::INTERNAL, "Error writing response."); diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore new file mode 100644 index 0000000000..dbf38f34b7 --- /dev/null +++ b/src/csharp/.gitignore @@ -0,0 +1,2 @@ +*.userprefs +test-results diff --git a/src/csharp/README.md b/src/csharp/README.md new file mode 100755 index 0000000000..5b56303c14 --- /dev/null +++ b/src/csharp/README.md @@ -0,0 +1,22 @@ +gRPC C# +======= + +A C# implementation of gRPC, Google's RPC library. + +EXPERIMENTAL ONLY +----------------- + +**This gRPC C# implementation is work-in-progress and is not expected to work yet.** + +- The implementation is a wrapper around gRPC C core library +- Code only runs under mono currently, building gGRPC C core library under Windows + is in progress. +- It is very possible that some parts of the code will be heavily refactored or + completely rewritten. + +CONTENTS +-------- + +- ext: + The extension library that wraps C API to be more digestible by C#. + diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c new file mode 100644 index 0000000000..74d11c655b --- /dev/null +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -0,0 +1,113 @@ +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> + +#include <string.h> + +grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) { + gpr_slice slice = gpr_slice_from_copied_buffer(buffer, len); + grpc_byte_buffer *bb = grpc_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + return bb; +} + +void grpc_call_start_write_from_copied_buffer(grpc_call *call, + const char *buffer, size_t len, + void *tag, gpr_uint32 flags) { + grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len); + GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) == + GRPC_CALL_OK); + grpc_byte_buffer_destroy(byte_buffer); +} + +grpc_completion_type grpc_event_type(const grpc_event *event) { + return event->type; +} + +grpc_op_error grpc_event_write_accepted(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED); + return event->data.invoke_accepted; +} + +grpc_op_error grpc_event_finish_accepted(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_FINISH_ACCEPTED); + return event->data.finish_accepted; +} + +grpc_status_code grpc_event_finished_status(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_FINISHED); + return event->data.finished.status; +} + +const char *grpc_event_finished_details(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_FINISHED); + return event->data.finished.details; +} + +gpr_intptr grpc_event_read_length(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_READ); + if (!event->data.read) { + return -1; + } + return grpc_byte_buffer_length(event->data.read); +} + +/* + * Copies data from read event to a buffer. Fatal error occurs if + * buffer is too small. + */ +void grpc_event_read_copy_to_buffer(const grpc_event *event, char *buffer, + size_t buffer_len) { + grpc_byte_buffer_reader *reader; + gpr_slice slice; + size_t offset = 0; + + GPR_ASSERT(event->type == GRPC_READ); + reader = grpc_byte_buffer_reader_create(event->data.read); + + GPR_ASSERT(event->data.read); + while (grpc_byte_buffer_reader_next(reader, &slice)) { + size_t len = GPR_SLICE_LENGTH(slice); + GPR_ASSERT(offset + len <= buffer_len); + memcpy(buffer + offset, GPR_SLICE_START_PTR(slice), + GPR_SLICE_LENGTH(slice)); + offset += len; + gpr_slice_unref(slice); + } + grpc_byte_buffer_reader_destroy(reader); +} + +grpc_call *grpc_event_call(const grpc_event *event) { + /* we only allow this for newly incoming server calls. */ + GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW); + return event->call; +} + +const char *grpc_event_server_rpc_new_method(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW); + return event->data.server_rpc_new.method; +} + +grpc_completion_type grpc_completion_queue_next_with_callback( + grpc_completion_queue *cq) { + grpc_event *ev; + grpc_completion_type t; + void (*callback)(grpc_event *); + + ev = grpc_completion_queue_next(cq, gpr_inf_future); + t = ev->type; + if (ev->tag) { + /* call the callback in ev->tag */ + /* C forbids to cast object pointers to function pointers, so + * we cast to intptr first. + */ + callback = (void (*)(grpc_event *))(gpr_intptr)ev->tag; + (*callback)(ev); + } + grpc_event_finish(ev); + + /* return completion type to allow some handling for events that have no + * tag - such as GRPC_QUEUE_SHUTDOWN + */ + return t; +} diff --git a/src/node/ext/timeval.cc b/src/node/ext/timeval.cc index 687e33576b..20d52f0963 100644 --- a/src/node/ext/timeval.cc +++ b/src/node/ext/timeval.cc @@ -56,9 +56,8 @@ double TimespecToMilliseconds(gpr_timespec timespec) { } else if (gpr_time_cmp(timespec, gpr_inf_past) == 0) { return -std::numeric_limits<double>::infinity(); } else { - struct timeval time = gpr_timeval_from_timespec(timespec); - return (static_cast<double>(time.tv_sec) * 1000 + - static_cast<double>(time.tv_usec) / 1000); + return (static_cast<double>(timespec.tv_sec) * 1000 + + static_cast<double>(timespec.tv_nsec) / 1000000); } } diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index bd3490b362..3bc9ce2bea 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -85,73 +85,25 @@ zval *grpc_call_create_metadata_array(int count, grpc_metadata *elements) { memcpy(str_val, elem->value, elem->value_length); if (zend_hash_find(array_hash, str_key, key_len, (void **)data) == SUCCESS) { - switch (Z_TYPE_P(*data)) { - case IS_STRING: - MAKE_STD_ZVAL(inner_array); - array_init(inner_array); - add_next_index_zval(inner_array, *data); - add_assoc_zval(array, str_key, inner_array); - break; - case IS_ARRAY: - inner_array = *data; - break; - default: - zend_throw_exception(zend_exception_get_default(), - "Metadata hash somehow contains wrong types.", - 1 TSRMLS_CC); + if (Z_TYPE_P(*data) != IS_ARRAY) { + zend_throw_exception(zend_exception_get_default(), + "Metadata hash somehow contains wrong types.", + 1 TSRMLS_CC); efree(str_key); efree(str_val); return NULL; } - add_next_index_stringl(inner_array, str_val, elem->value_length, false); + add_next_index_stringl(*data, str_val, elem->value_length, false); } else { - add_assoc_stringl(array, str_key, str_val, elem->value_length, false); + MAKE_STD_ZVAL(inner_array); + array_init(inner_array); + add_next_index_stringl(inner_array, str_val, elem->value_length, false); + add_assoc_zval(array, str_key, inner_array); } } return array; } -int php_grpc_call_add_metadata_array_walk(void *elem TSRMLS_DC, int num_args, - va_list args, - zend_hash_key *hash_key) { - grpc_call_error error_code; - zval **data = (zval **)elem; - grpc_metadata metadata; - grpc_call *call = va_arg(args, grpc_call *); - gpr_uint32 flags = va_arg(args, gpr_uint32); - const char *key; - HashTable *inner_hash; - /* We assume that either two args were passed, and we are in the recursive - case (and the second argument is the key), or one arg was passed and - hash_key is the string key. */ - if (num_args > 2) { - key = va_arg(args, const char *); - } else { - /* TODO(mlumish): If possible, check that hash_key is a string */ - key = hash_key->arKey; - } - switch (Z_TYPE_P(*data)) { - case IS_STRING: - metadata.key = (char *)key; - metadata.value = Z_STRVAL_P(*data); - metadata.value_length = Z_STRLEN_P(*data); - error_code = grpc_call_add_metadata_old(call, &metadata, 0u); - MAYBE_THROW_CALL_ERROR(add_metadata, error_code); - break; - case IS_ARRAY: - inner_hash = Z_ARRVAL_P(*data); - zend_hash_apply_with_arguments(inner_hash TSRMLS_CC, - php_grpc_call_add_metadata_array_walk, 3, - call, flags, key); - break; - default: - zend_throw_exception(zend_exception_get_default(), - "Metadata hash somehow contains wrong types.", - 1 TSRMLS_CC); - } - return ZEND_HASH_APPLY_KEEP; -} - /** * Constructs a new instance of the Call class. * @param Channel $channel The channel to associate the call with. Must not be @@ -204,8 +156,18 @@ PHP_METHOD(Call, __construct) { PHP_METHOD(Call, add_metadata) { wrapped_grpc_call *call = (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); + grpc_metadata metadata; + grpc_call_error error_code; zval *array; + zval **inner_array; + zval **value; HashTable *array_hash; + HashPosition array_pointer; + HashTable *inner_array_hash; + HashPosition inner_array_pointer; + char *key; + uint key_len; + ulong index; long flags = 0; /* "a|l" == 1 array, 1 optional long */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a|l", &array, &flags) == @@ -216,9 +178,41 @@ PHP_METHOD(Call, add_metadata) { return; } array_hash = Z_ARRVAL_P(array); - zend_hash_apply_with_arguments(array_hash TSRMLS_CC, - php_grpc_call_add_metadata_array_walk, 2, - call->wrapped, (gpr_uint32)flags); + for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer); + zend_hash_get_current_data_ex(array_hash, (void**)&inner_array, + &array_pointer) == SUCCESS; + zend_hash_move_forward_ex(array_hash, &array_pointer)) { + if (zend_hash_get_current_key_ex(array_hash, &key, &key_len, &index, 0, + &array_pointer) != HASH_KEY_IS_STRING) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "metadata keys must be strings", 1 TSRMLS_CC); + return; + } + if (Z_TYPE_P(*inner_array) != IS_ARRAY) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "metadata values must be arrays", + 1 TSRMLS_CC); + return; + } + inner_array_hash = Z_ARRVAL_P(*inner_array); + for (zend_hash_internal_pointer_reset_ex(inner_array_hash, + &inner_array_pointer); + zend_hash_get_current_data_ex(inner_array_hash, (void**)&value, + &inner_array_pointer) == SUCCESS; + zend_hash_move_forward_ex(inner_array_hash, &inner_array_pointer)) { + if (Z_TYPE_P(*value) != IS_STRING) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "metadata values must be arrays of strings", + 1 TSRMLS_CC); + return; + } + metadata.key = key; + metadata.value = Z_STRVAL_P(*value); + metadata.value_length = Z_STRLEN_P(*value); + error_code = grpc_call_add_metadata_old(call->wrapped, &metadata, 0u); + MAYBE_THROW_CALL_ERROR(add_metadata, error_code); + } + } } /** diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h index 232c5d7cf2..ba1f1e797f 100644 --- a/src/php/ext/grpc/call.h +++ b/src/php/ext/grpc/call.h @@ -19,6 +19,7 @@ zend_throw_exception(spl_ce_LogicException, \ #func_name " was called incorrectly", \ (long)error_code TSRMLS_CC); \ + return; \ } \ } while (0) diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c index bc4fcf07c9..47ea38db0c 100644 --- a/src/php/ext/grpc/server.c +++ b/src/php/ext/grpc/server.c @@ -146,7 +146,7 @@ PHP_METHOD(Server, add_http2_port) { "add_http2_port expects a string", 1 TSRMLS_CC); return; } - RETURN_BOOL(grpc_server_add_http2_port(server->wrapped, addr)); + RETURN_LONG(grpc_server_add_http2_port(server->wrapped, addr)); } PHP_METHOD(Server, add_secure_http2_port) { @@ -161,7 +161,7 @@ PHP_METHOD(Server, add_secure_http2_port) { "add_http2_port expects a string", 1 TSRMLS_CC); return; } - RETURN_BOOL(grpc_server_add_secure_http2_port(server->wrapped, addr)); + RETURN_LONG(grpc_server_add_secure_http2_port(server->wrapped, addr)); } /** diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index d1f994a84b..5266e9a9fa 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -149,6 +149,25 @@ function pingPong($stub) { 'Call did not complete successfully'); } +function cancelAfterFirstResponse($stub) { + $call = $stub->FullDuplexCall(); + $request = new grpc\testing\StreamingOutputCallRequest(); + $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE); + $response_parameters = new grpc\testing\ResponseParameters(); + $response_parameters->setSize(31415); + $request->addResponseParameters($response_parameters); + $payload = new grpc\testing\Payload(); + $payload->setBody(str_repeat("\0", 27182)); + $request->setPayload($payload); + + $call->write($request); + $response = $call->read(); + + $call->cancel(); + hardAssert($call->getStatus()->code === Grpc\STATUS_CANCELLED, + 'Call status was not CANCELLED'); +} + $args = getopt('', array('server_host:', 'server_port:', 'test_case:')); if (!array_key_exists('server_host', $args) || !array_key_exists('server_port', $args) || @@ -187,4 +206,6 @@ switch($args['test_case']) { case 'ping_pong': pingPong($stub); break; + case 'cancel_after_first_response': + cancelAfterFirstResponse($stub); }
\ No newline at end of file diff --git a/src/php/tests/unit_tests/CallTest.php b/src/php/tests/unit_tests/CallTest.php index 795831cb65..8f709b7e9c 100755 --- a/src/php/tests/unit_tests/CallTest.php +++ b/src/php/tests/unit_tests/CallTest.php @@ -1,16 +1,17 @@ <?php class CallTest extends PHPUnit_Framework_TestCase{ static $server; + static $port; public static function setUpBeforeClass() { $cq = new Grpc\CompletionQueue(); self::$server = new Grpc\Server($cq, []); - self::$server->add_http2_port('localhost:9001'); + self::$port = self::$server->add_http2_port('0.0.0.0:0'); } public function setUp() { $this->cq = new Grpc\CompletionQueue(); - $this->channel = new Grpc\Channel('localhost:9001', []); + $this->channel = new Grpc\Channel('localhost:' . self::$port, []); $this->call = new Grpc\Call($this->channel, '/foo', Grpc\Timeval::inf_future()); @@ -46,7 +47,7 @@ class CallTest extends PHPUnit_Framework_TestCase{ } public function testAddSingleMetadata() { - $this->call->add_metadata(['key' => 'value'], 0); + $this->call->add_metadata(['key' => ['value']], 0); /* Dummy assert: Checks that the previous call completed without error */ $this->assertTrue(true); } @@ -59,7 +60,7 @@ class CallTest extends PHPUnit_Framework_TestCase{ public function testAddSingleAndMultiValueMetadata() { $this->call->add_metadata( - ['key1' => 'value1', + ['key1' => ['value1'], 'key2' => ['value2', 'value3']], 0); /* Dummy assert: Checks that the previous call completed without error */ $this->assertTrue(true); diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index a2d8029b04..05104c0e12 100755 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -1,13 +1,11 @@ <?php -require __DIR__ . '/../util/port_picker.php'; class EndToEndTest extends PHPUnit_Framework_TestCase{ public function setUp() { $this->client_queue = new Grpc\CompletionQueue(); $this->server_queue = new Grpc\CompletionQueue(); $this->server = new Grpc\Server($this->server_queue, []); - $address = '127.0.0.1:' . getNewPort(); - $this->server->add_http2_port($address); - $this->channel = new Grpc\Channel($address, []); + $port = $this->server->add_http2_port('0.0.0.0:0'); + $this->channel = new Grpc\Channel('localhost:' . $port, []); } public function tearDown() { diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index 7ba4984bd8..5e95b11b44 100755 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -11,10 +11,9 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ file_get_contents(dirname(__FILE__) . '/../data/server1.pem')); $this->server = new Grpc\Server($this->server_queue, ['credentials' => $server_credentials]); - $address = '127.0.0.1:' . getNewPort(); - $this->server->add_secure_http2_port($address); + $port = $this->server->add_secure_http2_port('0.0.0.0:0'); $this->channel = new Grpc\Channel( - $address, + 'localhost:' . $port, [ 'grpc.ssl_target_name_override' => 'foo.test.google.com', 'credentials' => $credentials diff --git a/src/php/tests/util/port_picker.php b/src/php/tests/util/port_picker.php deleted file mode 100755 index d869d8b0a4..0000000000 --- a/src/php/tests/util/port_picker.php +++ /dev/null @@ -1,6 +0,0 @@ -<?php -function getNewPort() { - static $port = 10000; - $port += 1; - return $port; -}
\ No newline at end of file |