From 124bccf49c41e6747d4942f1dd338d3673c7d515 Mon Sep 17 00:00:00 2001 From: Karl Ramm Date: Sat, 28 Sep 2013 19:22:44 -0400 Subject: rename zephyr python modules to zephyr_ctypes Having more plausible claimants to the title of "python zephyr module" installed was interfering with builds. --- debian/libzephyr-python.files | 2 +- debian/rules | 2 +- lib/zephyr_tests.py | 2 +- lib/zephyr_tests.txt | 2 +- python/zephyr.py | 486 ------------------------------------------ python/zephyr_ctypes.py | 486 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 490 insertions(+), 490 deletions(-) delete mode 100755 python/zephyr.py create mode 100755 python/zephyr_ctypes.py diff --git a/debian/libzephyr-python.files b/debian/libzephyr-python.files index 5e373d8..4188d73 100644 --- a/debian/libzephyr-python.files +++ b/debian/libzephyr-python.files @@ -1 +1 @@ -usr/lib/python2.5/site-packages/zephyr.py +usr/lib/python2.5/site-packages/zephyr_ctypes.py diff --git a/debian/rules b/debian/rules index eba0709..e931af8 100755 --- a/debian/rules +++ b/debian/rules @@ -130,7 +130,7 @@ install: build cp -rp debian/tmp/etc/zephyr/acl debian/tmp-$$dir/etc/zephyr; \ done mkdir -p debian/tmp/usr/lib/python2.5/site-packages - install -c -m 644 python/zephyr.py debian/tmp/usr/lib/python2.5/site-packages + install -c -m 644 python/zephyr_ctypes.py debian/tmp/usr/lib/python2.5/site-packages # Build architecture-independent files here. # Pass -i to all debhelper commands in this target to reduce clutter. diff --git a/lib/zephyr_tests.py b/lib/zephyr_tests.py index 4685c87..d8a0aca 100755 --- a/lib/zephyr_tests.py +++ b/lib/zephyr_tests.py @@ -16,7 +16,7 @@ import ctypes.util import time from ctypes import c_int, c_char, POINTER, c_char_p, sizeof -from zephyr import * +from zephyr_ctypes import * __revision__ = "$Id$" try: diff --git a/lib/zephyr_tests.txt b/lib/zephyr_tests.txt index e3fe0ab..38aa799 100644 --- a/lib/zephyr_tests.txt +++ b/lib/zephyr_tests.txt @@ -189,7 +189,7 @@ Trivial test of ZOpenPort and ZClosePort: >>> assert port != 0 >>> print type(_z.ZGetDestAddr()) - + >>> print _z.ZGetDestAddr().sin_family.pformat() ['AF_INET(2)'] >>> print _z.ZGetDestAddr().sin_addr.pformat() diff --git a/python/zephyr.py b/python/zephyr.py deleted file mode 100755 index 9825ada..0000000 --- a/python/zephyr.py +++ /dev/null @@ -1,486 +0,0 @@ -#!/usr/bin/python - -"""rough bindings for libzephyr""" - -import socket -import struct -import ctypes -import ctypes.util -import time -import sys -from ctypes import c_int, c_uint, c_ushort, c_char, c_ubyte -from ctypes import c_uint16, c_uint32 -from ctypes import POINTER, c_void_p, c_char_p -from ctypes import Structure, Union, sizeof - -__revision__ = "$Id$" -try: - __version__ = "%s/%s" % (__revision__.split()[3], __revision__.split()[2]) -except IndexError: - __version__ = "unknown" - -def print_line_or_lines(results, indent): - """short values on same line, multi-line on later ones...""" - if len(results) == 1: - print results[0] - else: - print - for result in results: - print indent + result - -def ctypes_pprint(cstruct, indent=""): - """pretty print a ctypes Structure or Union""" - - for field_name, field_ctype in cstruct._fields_: - field_value = getattr(cstruct, field_name) - print indent + field_name, - next_indent = indent + " " - pprint_name = "pprint_%s" % field_name - pformat_name = "pformat_%s" % field_name - if hasattr(cstruct, pprint_name): - # no longer used - getattr(cstruct, pprint_name)(next_indent) - elif hasattr(cstruct, pformat_name): - # counted-array and other common cases - print_line_or_lines(getattr(cstruct, pformat_name)(), next_indent) - elif hasattr(field_value, "pformat"): - # common for small compound types - print_line_or_lines(field_value.pformat(), next_indent) - elif hasattr(field_value, "pprint"): - # useful for Union selectors - field_value.pprint(next_indent) - elif hasattr(field_value, "_fields_"): - # generic recursion - print - ctypes_pprint(field_value, next_indent) - else: - # generic simple (or unknown/uninteresting) value - print field_value - -class Enum(c_int): - def pformat(self): - try: - return ["%s(%d)" % (self._values_[self.value], self.value)] - except IndexError: - return ["unknown enum value(%d)" % (self.value)] - -def populate_enum(cls): - """make members for each of the enum values""" - for value, tag in enumerate(cls._values_): - setattr(cls, tag, cls(value)) - -# not really an enum, but we get a richer effect by treating it as one -class Enum_u16(c_uint16): - def pformat(self): - try: - return ["%s(%d)" % (self._values_[self.value], self.value)] - except IndexError: - return ["unknown enum value(%d)" % (self.value)] - -class Enum_u8(c_ubyte): - def pformat(self): - try: - return ["%s(%d)" % (self._values_[self.value], self.value)] - except IndexError: - return ["unknown enum value(%d)" % (self.value)] - -# POSIX socket types... -class in_addr(Structure): - _fields_ = [ - ("s_addr", c_uint32), - ] - def pformat(self): - return [socket.inet_ntoa(struct.pack("= 0: - sa_has_len = True -elif sys.platform.startswith("linux"): - sa_has_len = False -else: - # ?? - sa_has_len = False - -if sa_has_len: - af_base_type = Enum_u8 -else: - af_base_type = Enum_u16 - -class AF_(af_base_type): - _socket_af = dict([(v,n) for n,v in socket.__dict__.items() if n.startswith("AF_")]) - _values_ = [_socket_af.get(k, "unknown address family") for k in range(min(_socket_af), max(_socket_af)+1)] - -populate_enum(AF_) - -def maybe_add_len_field(len_name, other_fields): - if sa_has_len: - return [(len_name, c_ubyte)] + other_fields - else: - return other_fields - -class sockaddr(Structure): - _fields_ = maybe_add_len_field("sa_len", [ - ("sa_family", AF_), - ("sa_data", c_char * 14), - ]) - -class sockaddr_in(Structure): - _fields_ = maybe_add_len_field("sin_len", [ - ("sin_family", AF_), - ("sin_port", c_uint16), - ("sin_addr", in_addr), - # hack from linux - do we actually need it? - ("sin_zero", c_ubyte * (sizeof(sockaddr)-sizeof(c_uint16)-sizeof(c_uint16)-sizeof(in_addr))), - ]) - def pformat_sin_zero(self): - return ["[ignored]"] - -# RFC2553... -class sockaddr_in6(Structure): - _fields_ = maybe_add_len_field("sin6_len", [ - ("sin6_family", AF_), - ("sin6_port", c_uint16), - ("sin6_flowinfo", c_uint32), - ("sin6_addr", in6_addr), - ("sin6_scope_id", c_uint32), - ]) - -# zephyr/zephyr.h -#define Z_MAXOTHERFIELDS 10 /* Max unknown fields in ZNotice_t */ -Z_MAXOTHERFIELDS = 10 -#define ZAUTH (ZMakeAuthentication) -#define ZCAUTH (ZMakeZcodeAuthentication) -#define ZNOAUTH ((Z_AuthProc)0) -ZNOAUTH = 0 - -# typedef enum { -# UNSAFE, UNACKED, ACKED, HMACK, HMCTL, SERVACK, SERVNAK, CLIENTACK, STAT -# } ZNotice_Kind_t; -# extern const char *ZNoticeKinds[9]; - -class ZNotice_Kind_t(Enum): - _values_ = [ - "UNSAFE", "UNACKED", "ACKED", "HMACK", "HMCTL", "SERVACK", "SERVNAK", "CLIENTACK", "STAT", - ] -populate_enum(ZNotice_Kind_t) - -def pformat_timeval(tv_sec, tv_usec): - """format timeval parts as seconds and human-readable time""" - try: - timestr = time.ctime(tv_sec) - except ValueError: - timestr = "invalid unix time" - if tv_usec >= 1000000 or tv_usec < 0: - # invalid usec, still treat as numbers - return ["%dsec, %dusec (bad) (%s)" % (tv_sec, tv_usec, timestr)] - return ["%d.%06dsec (%s)" % (tv_sec, tv_usec, timestr)] - -# struct _ZTimeval { -class _ZTimeval(Structure): - _fields_ = [ -# int tv_sec; - ("tv_sec", c_int), -# int tv_usec; - ("tv_usec", c_int), -# }; - ] - def pformat(self): - return pformat_timeval(self.tv_sec, self.tv_usec) - - -class _ZTimeval_Net(_ZTimeval): - """When _ZTimeval is used in a ZUnique_Id_t, the time parts are - stored in network byte order. Handle this by faking up a different type.""" - def pformat(self): - return pformat_timeval(socket.ntohl(self.tv_sec & 0xffffffff), socket.ntohl(self.tv_usec & 0xffffffff)) - -# typedef struct _ZUnique_Id_t { -class ZUnique_Id_t(Structure): - _fields_ = [ - # struct in_addr zuid_addr; - ("zuid_addr", in_addr), - # struct _ZTimeval tv; - ("tv", _ZTimeval_Net), - # } ZUnique_Id_t; - ] - -# union { -class _U_z_sender_sockaddr(Union): - _fields_ = [ - # struct sockaddr sa; - ("sa", sockaddr), - # struct sockaddr_in ip4; - ("ip4", sockaddr_in), - # struct sockaddr_in6 ip6; - ("ip6", sockaddr_in6), - # } z_sender_sockaddr; - ] - def pprint(self, indent): - print - if self.sa.sa_family.value == socket.AF_INET: - ctypes_pprint(self.ip4, indent + ".ip4:") - elif self.sa.sa_family.value == socket.AF_INET6: - ctypes_pprint(self.ip6, indent + ".ip6:") - else: - ctypes_pprint(self.sa, indent + ".sa:") - -# typedef struct _ZNotice_t { -class ZNotice_t(Structure): - _fields_ = [ - # char *z_packet; - ("z_packet", c_char_p), - # char *z_version; - ("z_version", c_char_p), - # ZNotice_Kind_t z_kind; - ("z_kind", ZNotice_Kind_t), - # ZUnique_Id_t z_uid; - ("z_uid", ZUnique_Id_t), - # union { - # struct sockaddr sa; - # struct sockaddr_in ip4; - # struct sockaddr_in6 ip6; - # } z_sender_sockaddr; - ("z_sender_sockaddr", _U_z_sender_sockaddr), - - # /* heavily deprecated: */ - # #define z_sender_addr z_sender_sockaddr.ip4.sin_addr - # /* probably a bad idea?: */ - # struct _ZTimeval z_time; - ("z_time", _ZTimeval), - # unsigned short z_port; - ("z_port", c_ushort), - # unsigned short z_charset; - ("z_charset", c_ushort), - # int z_auth; - ("z_auth", c_int), - # int z_checked_auth; - # TODO: fake enum, for display - ("z_checked_auth", c_int), - # int z_authent_len; - ("z_authent_len", c_int), - # char *z_ascii_authent; - ("z_ascii_authent", c_char_p), - # char *z_class; - ("z_class", c_char_p), - # char *z_class_inst; - ("z_class_inst", c_char_p), - # char *z_opcode; - ("z_opcode", c_char_p), - # char *z_sender; - ("z_sender", c_char_p), - # char *z_recipient; - ("z_recipient", c_char_p), - # char *z_default_format; - ("z_default_format", c_char_p), - # char *z_multinotice; - ("z_multinotice", c_char_p), - # ZUnique_Id_t z_multiuid; - ("z_multiuid", ZUnique_Id_t), - # ZChecksum_t z_checksum; - ("z_checksum", c_uint), - # char *z_ascii_checksum; - ("z_ascii_checksum", c_char_p), - # int z_num_other_fields; - ("z_num_other_fields", c_int), - # char *z_other_fields[Z_MAXOTHERFIELDS]; - ("z_other_fields", c_char_p * Z_MAXOTHERFIELDS), - # caddr_t z_message; - ("z_message", c_char_p), # not 1980 - # int z_message_len; - ("z_message_len", c_int), - # int z_num_hdr_fields; - ("z_num_hdr_fields", c_int), - # char **z_hdr_fields; - ("z_hdr_fields", POINTER(c_char_p)), - # } ZNotice_t; - ] - def pformat_z_other_fields(self): - return ["%d: %s" % (n, self.z_other_fields[n]) - for n in range(Z_MAXOTHERFIELDS)] - def pformat_z_hdr_fields(self): - if not self.z_hdr_fields: - return ["NULL"] - return ["%d: %s" % (n, self.z_hdr_fields[n]) - for n in range(self.z_num_hdr_fields)] - -class libZephyr(object): - """wrappers for functions in libZephyr""" - testable_funcs = [ - "ZInitialize", - "ZGetFD", - "ZGetRealm", - "ZGetSender", - "Z_FormatRawHeader", - "ZParseNotice", - "ZFormatNotice", - "ZCompareUID", - "ZExpandRealm", - "ZGetCharsetString", - "ZGetCharset", - "ZCharsetToString", - "ZTransliterate", - "ZOpenPort", - "ZClosePort", - "ZMakeAscii", - "ZMakeZcode", - "ZGetDestAddr", - "ZSetFD", - "ZPending", - ] - def __init__(self, library_path=None): - """connect to the library and build the wrappers""" - if not library_path: - library_path = ctypes.util.find_library("zephyr") - self._lib = ctypes.cdll.LoadLibrary(library_path) - - # grab the Zauthtype variable - self.Zauthtype = ctypes.c_int.in_dll(self._lib, 'Zauthtype').value - - # generic bindings? - for funcname in self.testable_funcs: - setattr(self, funcname, getattr(self._lib, funcname)) - - # TODO: fix return types, caller types in a more generic way later - # (perhaps by parsing the headers or code) - # perhaps metaprogramming or decorators... - self.ZGetRealm.restype = ctypes.c_char_p - self.ZGetSender.restype = ctypes.c_char_p - - # Code_t - # Z_FormatRawHeader(ZNotice_t *notice, - # char *buffer, - # int buffer_len, - # int *len, - # char **cstart, - # char **cend) - # This stuffs a notice into a buffer; cstart/cend point into the checksum in buffer - self.Z_FormatRawHeader.argtypes = [ - c_void_p, # *notice - c_char_p, # *buffer - c_int, # buffer_len - POINTER(c_int), # *len - POINTER(c_char_p), # **cstart - POINTER(c_char_p), # **cend - ] - - # Code_t - # ZParseNotice(char *buffer, - # int len, - # ZNotice_t *notice) - self.ZParseNotice.argtypes = [ - c_char_p, # *buffer - c_int, # len - POINTER(ZNotice_t), # *notice - ] - - # Code_t - # ZFormatNotice(register ZNotice_t *notice, - # char **buffer, - # int *ret_len, - # Z_AuthProc cert_routine) - self.ZFormatNotice.argtypes = [ - POINTER(ZNotice_t), # *notice - POINTER(c_char_p), # **buffer - POINTER(c_int), # *ret_len - c_void_p, # cert_routine - ] - - # int - # ZCompareUID(ZUnique_Id_t *uid1, - # ZUnique_Id_t *uid2) - self.ZCompareUID.argtypes = [ - POINTER(ZUnique_Id_t), # *uid1 - POINTER(ZUnique_Id_t), # *uid2 - ] - - # char * - # ZExpandRealm(realm) - # char *realm; # mmm 80's - self.ZExpandRealm.restype = c_char_p - self.ZExpandRealm.argtypes = [ - c_char_p, # realm - ] - - # unsigned short - # ZGetCharset(char *charset) - self.ZGetCharset.restype = c_ushort - self.ZGetCharset.argtypes = [ - c_char_p, # charset - ] - - # const char * - # ZCharsetToString(unsigned short charset) - self.ZCharsetToString.restype = c_char_p - self.ZCharsetToString.argtypes = [ - c_ushort, # charset - ] - - # Code_t - # ZTransliterate(char *in, - # int inlen, - # char *inset, - # char *outset, - # char **out, - # int *outlen) - self.ZTransliterate.argtypes = [ - c_char_p, # in - c_int, # inlnet, - c_char_p, # inset - c_char_p, # outset - POINTER(c_char_p), # out - POINTER(c_int), # outlen - ] - - # Code_t ZOpenPort(u_short *port) - self.ZOpenPort.argtypes = [ - POINTER(c_ushort), # port - ] - - # const char * - # ZGetCharsetString(char *charset) - self.ZGetCharsetString.restype = c_char_p - self.ZGetCharsetString.argtypes = [ - c_char_p, # charset - ] - - # Code_t - # ZMakeAscii(register char *ptr, - # int len, - # unsigned char *field, - # int num) - self.ZMakeAscii.argtypes = [ - c_char_p, # ptr - c_int, # len - c_char_p, # field; c_uchar_p? - c_int, # num - ] - - # Code_t - # ZMakeZcode(register char *ptr, - # int len, - # unsigned char *field, - # int num) - self.ZMakeZcode.argtypes = [ - c_char_p, # ptr - c_int, # len - c_char_p, # field; c_uchar_p? - c_int, # num - ] - - # struct sockaddr_in ZGetDestAddr (void) { - self.ZGetDestAddr.restype = sockaddr_in - - # library-specific setup... - self.ZInitialize() diff --git a/python/zephyr_ctypes.py b/python/zephyr_ctypes.py new file mode 100755 index 0000000..9825ada --- /dev/null +++ b/python/zephyr_ctypes.py @@ -0,0 +1,486 @@ +#!/usr/bin/python + +"""rough bindings for libzephyr""" + +import socket +import struct +import ctypes +import ctypes.util +import time +import sys +from ctypes import c_int, c_uint, c_ushort, c_char, c_ubyte +from ctypes import c_uint16, c_uint32 +from ctypes import POINTER, c_void_p, c_char_p +from ctypes import Structure, Union, sizeof + +__revision__ = "$Id$" +try: + __version__ = "%s/%s" % (__revision__.split()[3], __revision__.split()[2]) +except IndexError: + __version__ = "unknown" + +def print_line_or_lines(results, indent): + """short values on same line, multi-line on later ones...""" + if len(results) == 1: + print results[0] + else: + print + for result in results: + print indent + result + +def ctypes_pprint(cstruct, indent=""): + """pretty print a ctypes Structure or Union""" + + for field_name, field_ctype in cstruct._fields_: + field_value = getattr(cstruct, field_name) + print indent + field_name, + next_indent = indent + " " + pprint_name = "pprint_%s" % field_name + pformat_name = "pformat_%s" % field_name + if hasattr(cstruct, pprint_name): + # no longer used + getattr(cstruct, pprint_name)(next_indent) + elif hasattr(cstruct, pformat_name): + # counted-array and other common cases + print_line_or_lines(getattr(cstruct, pformat_name)(), next_indent) + elif hasattr(field_value, "pformat"): + # common for small compound types + print_line_or_lines(field_value.pformat(), next_indent) + elif hasattr(field_value, "pprint"): + # useful for Union selectors + field_value.pprint(next_indent) + elif hasattr(field_value, "_fields_"): + # generic recursion + print + ctypes_pprint(field_value, next_indent) + else: + # generic simple (or unknown/uninteresting) value + print field_value + +class Enum(c_int): + def pformat(self): + try: + return ["%s(%d)" % (self._values_[self.value], self.value)] + except IndexError: + return ["unknown enum value(%d)" % (self.value)] + +def populate_enum(cls): + """make members for each of the enum values""" + for value, tag in enumerate(cls._values_): + setattr(cls, tag, cls(value)) + +# not really an enum, but we get a richer effect by treating it as one +class Enum_u16(c_uint16): + def pformat(self): + try: + return ["%s(%d)" % (self._values_[self.value], self.value)] + except IndexError: + return ["unknown enum value(%d)" % (self.value)] + +class Enum_u8(c_ubyte): + def pformat(self): + try: + return ["%s(%d)" % (self._values_[self.value], self.value)] + except IndexError: + return ["unknown enum value(%d)" % (self.value)] + +# POSIX socket types... +class in_addr(Structure): + _fields_ = [ + ("s_addr", c_uint32), + ] + def pformat(self): + return [socket.inet_ntoa(struct.pack("= 0: + sa_has_len = True +elif sys.platform.startswith("linux"): + sa_has_len = False +else: + # ?? + sa_has_len = False + +if sa_has_len: + af_base_type = Enum_u8 +else: + af_base_type = Enum_u16 + +class AF_(af_base_type): + _socket_af = dict([(v,n) for n,v in socket.__dict__.items() if n.startswith("AF_")]) + _values_ = [_socket_af.get(k, "unknown address family") for k in range(min(_socket_af), max(_socket_af)+1)] + +populate_enum(AF_) + +def maybe_add_len_field(len_name, other_fields): + if sa_has_len: + return [(len_name, c_ubyte)] + other_fields + else: + return other_fields + +class sockaddr(Structure): + _fields_ = maybe_add_len_field("sa_len", [ + ("sa_family", AF_), + ("sa_data", c_char * 14), + ]) + +class sockaddr_in(Structure): + _fields_ = maybe_add_len_field("sin_len", [ + ("sin_family", AF_), + ("sin_port", c_uint16), + ("sin_addr", in_addr), + # hack from linux - do we actually need it? + ("sin_zero", c_ubyte * (sizeof(sockaddr)-sizeof(c_uint16)-sizeof(c_uint16)-sizeof(in_addr))), + ]) + def pformat_sin_zero(self): + return ["[ignored]"] + +# RFC2553... +class sockaddr_in6(Structure): + _fields_ = maybe_add_len_field("sin6_len", [ + ("sin6_family", AF_), + ("sin6_port", c_uint16), + ("sin6_flowinfo", c_uint32), + ("sin6_addr", in6_addr), + ("sin6_scope_id", c_uint32), + ]) + +# zephyr/zephyr.h +#define Z_MAXOTHERFIELDS 10 /* Max unknown fields in ZNotice_t */ +Z_MAXOTHERFIELDS = 10 +#define ZAUTH (ZMakeAuthentication) +#define ZCAUTH (ZMakeZcodeAuthentication) +#define ZNOAUTH ((Z_AuthProc)0) +ZNOAUTH = 0 + +# typedef enum { +# UNSAFE, UNACKED, ACKED, HMACK, HMCTL, SERVACK, SERVNAK, CLIENTACK, STAT +# } ZNotice_Kind_t; +# extern const char *ZNoticeKinds[9]; + +class ZNotice_Kind_t(Enum): + _values_ = [ + "UNSAFE", "UNACKED", "ACKED", "HMACK", "HMCTL", "SERVACK", "SERVNAK", "CLIENTACK", "STAT", + ] +populate_enum(ZNotice_Kind_t) + +def pformat_timeval(tv_sec, tv_usec): + """format timeval parts as seconds and human-readable time""" + try: + timestr = time.ctime(tv_sec) + except ValueError: + timestr = "invalid unix time" + if tv_usec >= 1000000 or tv_usec < 0: + # invalid usec, still treat as numbers + return ["%dsec, %dusec (bad) (%s)" % (tv_sec, tv_usec, timestr)] + return ["%d.%06dsec (%s)" % (tv_sec, tv_usec, timestr)] + +# struct _ZTimeval { +class _ZTimeval(Structure): + _fields_ = [ +# int tv_sec; + ("tv_sec", c_int), +# int tv_usec; + ("tv_usec", c_int), +# }; + ] + def pformat(self): + return pformat_timeval(self.tv_sec, self.tv_usec) + + +class _ZTimeval_Net(_ZTimeval): + """When _ZTimeval is used in a ZUnique_Id_t, the time parts are + stored in network byte order. Handle this by faking up a different type.""" + def pformat(self): + return pformat_timeval(socket.ntohl(self.tv_sec & 0xffffffff), socket.ntohl(self.tv_usec & 0xffffffff)) + +# typedef struct _ZUnique_Id_t { +class ZUnique_Id_t(Structure): + _fields_ = [ + # struct in_addr zuid_addr; + ("zuid_addr", in_addr), + # struct _ZTimeval tv; + ("tv", _ZTimeval_Net), + # } ZUnique_Id_t; + ] + +# union { +class _U_z_sender_sockaddr(Union): + _fields_ = [ + # struct sockaddr sa; + ("sa", sockaddr), + # struct sockaddr_in ip4; + ("ip4", sockaddr_in), + # struct sockaddr_in6 ip6; + ("ip6", sockaddr_in6), + # } z_sender_sockaddr; + ] + def pprint(self, indent): + print + if self.sa.sa_family.value == socket.AF_INET: + ctypes_pprint(self.ip4, indent + ".ip4:") + elif self.sa.sa_family.value == socket.AF_INET6: + ctypes_pprint(self.ip6, indent + ".ip6:") + else: + ctypes_pprint(self.sa, indent + ".sa:") + +# typedef struct _ZNotice_t { +class ZNotice_t(Structure): + _fields_ = [ + # char *z_packet; + ("z_packet", c_char_p), + # char *z_version; + ("z_version", c_char_p), + # ZNotice_Kind_t z_kind; + ("z_kind", ZNotice_Kind_t), + # ZUnique_Id_t z_uid; + ("z_uid", ZUnique_Id_t), + # union { + # struct sockaddr sa; + # struct sockaddr_in ip4; + # struct sockaddr_in6 ip6; + # } z_sender_sockaddr; + ("z_sender_sockaddr", _U_z_sender_sockaddr), + + # /* heavily deprecated: */ + # #define z_sender_addr z_sender_sockaddr.ip4.sin_addr + # /* probably a bad idea?: */ + # struct _ZTimeval z_time; + ("z_time", _ZTimeval), + # unsigned short z_port; + ("z_port", c_ushort), + # unsigned short z_charset; + ("z_charset", c_ushort), + # int z_auth; + ("z_auth", c_int), + # int z_checked_auth; + # TODO: fake enum, for display + ("z_checked_auth", c_int), + # int z_authent_len; + ("z_authent_len", c_int), + # char *z_ascii_authent; + ("z_ascii_authent", c_char_p), + # char *z_class; + ("z_class", c_char_p), + # char *z_class_inst; + ("z_class_inst", c_char_p), + # char *z_opcode; + ("z_opcode", c_char_p), + # char *z_sender; + ("z_sender", c_char_p), + # char *z_recipient; + ("z_recipient", c_char_p), + # char *z_default_format; + ("z_default_format", c_char_p), + # char *z_multinotice; + ("z_multinotice", c_char_p), + # ZUnique_Id_t z_multiuid; + ("z_multiuid", ZUnique_Id_t), + # ZChecksum_t z_checksum; + ("z_checksum", c_uint), + # char *z_ascii_checksum; + ("z_ascii_checksum", c_char_p), + # int z_num_other_fields; + ("z_num_other_fields", c_int), + # char *z_other_fields[Z_MAXOTHERFIELDS]; + ("z_other_fields", c_char_p * Z_MAXOTHERFIELDS), + # caddr_t z_message; + ("z_message", c_char_p), # not 1980 + # int z_message_len; + ("z_message_len", c_int), + # int z_num_hdr_fields; + ("z_num_hdr_fields", c_int), + # char **z_hdr_fields; + ("z_hdr_fields", POINTER(c_char_p)), + # } ZNotice_t; + ] + def pformat_z_other_fields(self): + return ["%d: %s" % (n, self.z_other_fields[n]) + for n in range(Z_MAXOTHERFIELDS)] + def pformat_z_hdr_fields(self): + if not self.z_hdr_fields: + return ["NULL"] + return ["%d: %s" % (n, self.z_hdr_fields[n]) + for n in range(self.z_num_hdr_fields)] + +class libZephyr(object): + """wrappers for functions in libZephyr""" + testable_funcs = [ + "ZInitialize", + "ZGetFD", + "ZGetRealm", + "ZGetSender", + "Z_FormatRawHeader", + "ZParseNotice", + "ZFormatNotice", + "ZCompareUID", + "ZExpandRealm", + "ZGetCharsetString", + "ZGetCharset", + "ZCharsetToString", + "ZTransliterate", + "ZOpenPort", + "ZClosePort", + "ZMakeAscii", + "ZMakeZcode", + "ZGetDestAddr", + "ZSetFD", + "ZPending", + ] + def __init__(self, library_path=None): + """connect to the library and build the wrappers""" + if not library_path: + library_path = ctypes.util.find_library("zephyr") + self._lib = ctypes.cdll.LoadLibrary(library_path) + + # grab the Zauthtype variable + self.Zauthtype = ctypes.c_int.in_dll(self._lib, 'Zauthtype').value + + # generic bindings? + for funcname in self.testable_funcs: + setattr(self, funcname, getattr(self._lib, funcname)) + + # TODO: fix return types, caller types in a more generic way later + # (perhaps by parsing the headers or code) + # perhaps metaprogramming or decorators... + self.ZGetRealm.restype = ctypes.c_char_p + self.ZGetSender.restype = ctypes.c_char_p + + # Code_t + # Z_FormatRawHeader(ZNotice_t *notice, + # char *buffer, + # int buffer_len, + # int *len, + # char **cstart, + # char **cend) + # This stuffs a notice into a buffer; cstart/cend point into the checksum in buffer + self.Z_FormatRawHeader.argtypes = [ + c_void_p, # *notice + c_char_p, # *buffer + c_int, # buffer_len + POINTER(c_int), # *len + POINTER(c_char_p), # **cstart + POINTER(c_char_p), # **cend + ] + + # Code_t + # ZParseNotice(char *buffer, + # int len, + # ZNotice_t *notice) + self.ZParseNotice.argtypes = [ + c_char_p, # *buffer + c_int, # len + POINTER(ZNotice_t), # *notice + ] + + # Code_t + # ZFormatNotice(register ZNotice_t *notice, + # char **buffer, + # int *ret_len, + # Z_AuthProc cert_routine) + self.ZFormatNotice.argtypes = [ + POINTER(ZNotice_t), # *notice + POINTER(c_char_p), # **buffer + POINTER(c_int), # *ret_len + c_void_p, # cert_routine + ] + + # int + # ZCompareUID(ZUnique_Id_t *uid1, + # ZUnique_Id_t *uid2) + self.ZCompareUID.argtypes = [ + POINTER(ZUnique_Id_t), # *uid1 + POINTER(ZUnique_Id_t), # *uid2 + ] + + # char * + # ZExpandRealm(realm) + # char *realm; # mmm 80's + self.ZExpandRealm.restype = c_char_p + self.ZExpandRealm.argtypes = [ + c_char_p, # realm + ] + + # unsigned short + # ZGetCharset(char *charset) + self.ZGetCharset.restype = c_ushort + self.ZGetCharset.argtypes = [ + c_char_p, # charset + ] + + # const char * + # ZCharsetToString(unsigned short charset) + self.ZCharsetToString.restype = c_char_p + self.ZCharsetToString.argtypes = [ + c_ushort, # charset + ] + + # Code_t + # ZTransliterate(char *in, + # int inlen, + # char *inset, + # char *outset, + # char **out, + # int *outlen) + self.ZTransliterate.argtypes = [ + c_char_p, # in + c_int, # inlnet, + c_char_p, # inset + c_char_p, # outset + POINTER(c_char_p), # out + POINTER(c_int), # outlen + ] + + # Code_t ZOpenPort(u_short *port) + self.ZOpenPort.argtypes = [ + POINTER(c_ushort), # port + ] + + # const char * + # ZGetCharsetString(char *charset) + self.ZGetCharsetString.restype = c_char_p + self.ZGetCharsetString.argtypes = [ + c_char_p, # charset + ] + + # Code_t + # ZMakeAscii(register char *ptr, + # int len, + # unsigned char *field, + # int num) + self.ZMakeAscii.argtypes = [ + c_char_p, # ptr + c_int, # len + c_char_p, # field; c_uchar_p? + c_int, # num + ] + + # Code_t + # ZMakeZcode(register char *ptr, + # int len, + # unsigned char *field, + # int num) + self.ZMakeZcode.argtypes = [ + c_char_p, # ptr + c_int, # len + c_char_p, # field; c_uchar_p? + c_int, # num + ] + + # struct sockaddr_in ZGetDestAddr (void) { + self.ZGetDestAddr.restype = sockaddr_in + + # library-specific setup... + self.ZInitialize() -- cgit v1.2.3