From d54bda3667ccb164fbfb37bf7b4d7aa3fea2471f Mon Sep 17 00:00:00 2001 From: Karl Ramm Date: Wed, 15 Jul 2009 14:34:02 +0000 Subject: Factor the python library stuff out of the test suite and stick it in it's own file. And then put it in a debian package. --- python/zephyr.py | 458 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 458 insertions(+) create mode 100755 python/zephyr.py (limited to 'python') diff --git a/python/zephyr.py b/python/zephyr.py new file mode 100755 index 0000000..61beea5 --- /dev/null +++ b/python/zephyr.py @@ -0,0 +1,458 @@ +#!/usr/bin/python + +"""rough bindings for libzephyr""" + +import socket +import struct +import ctypes +import ctypes.util +import time +from ctypes import c_int, c_uint, c_ushort, c_char, c_ubyte +from ctypes import c_uint16, c_uint32 +from ctypes import POINTER, c_void_p, c_char_p +from ctypes import Structure, Union, sizeof + +__revision__ = "$Id$" +try: + __version__ = "%s/%s" % (__revision__.split()[3], __revision__.split()[2]) +except IndexError: + __version__ = "unknown" + +def print_line_or_lines(results, indent): + """short values on same line, multi-line on later ones...""" + if len(results) == 1: + print results[0] + else: + print + for result in results: + print indent + result + +def ctypes_pprint(cstruct, indent=""): + """pretty print a ctypes Structure or Union""" + + for field_name, field_ctype in cstruct._fields_: + field_value = getattr(cstruct, field_name) + print indent + field_name, + next_indent = indent + " " + pprint_name = "pprint_%s" % field_name + pformat_name = "pformat_%s" % field_name + if hasattr(cstruct, pprint_name): + # no longer used + getattr(cstruct, pprint_name)(next_indent) + elif hasattr(cstruct, pformat_name): + # counted-array and other common cases + print_line_or_lines(getattr(cstruct, pformat_name)(), next_indent) + elif hasattr(field_value, "pformat"): + # common for small compound types + print_line_or_lines(field_value.pformat(), next_indent) + elif hasattr(field_value, "pprint"): + # useful for Union selectors + field_value.pprint(next_indent) + elif hasattr(field_value, "_fields_"): + # generic recursion + print + ctypes_pprint(field_value, next_indent) + else: + # generic simple (or unknown/uninteresting) value + print field_value + +class Enum(c_int): + def pformat(self): + try: + return ["%s(%d)" % (self._values_[self.value], self.value)] + except IndexError: + return ["unknown enum value(%d)" % (self.value)] + +def populate_enum(cls): + """make members for each of the enum values""" + for value, tag in enumerate(cls._values_): + setattr(cls, tag, cls(value)) + +# not really an enum, but we get a richer effect by treating it as one +class Enum_u16(c_uint16): + def pformat(self): + try: + return ["%s(%d)" % (self._values_[self.value], self.value)] + except IndexError: + return ["unknown enum value(%d)" % (self.value)] + +# POSIX socket types... +class in_addr(Structure): + _fields_ = [ + ("s_addr", c_uint32), + ] + def pformat(self): + return [socket.inet_ntoa(struct.pack("= 1000000 or tv_usec < 0: + # invalid usec, still treat as numbers + return ["%dsec, %dusec (bad) (%s)" % (tv_sec, tv_usec, timestr)] + return ["%d.%06dsec (%s)" % (tv_sec, tv_usec, timestr)] + +# struct _ZTimeval { +class _ZTimeval(Structure): + _fields_ = [ +# int tv_sec; + ("tv_sec", c_int), +# int tv_usec; + ("tv_usec", c_int), +# }; + ] + def pformat(self): + return pformat_timeval(self.tv_sec, self.tv_usec) + + +class _ZTimeval_Net(_ZTimeval): + """When _ZTimeval is used in a ZUnique_Id_t, the time parts are + stored in network byte order. Handle this by faking up a different type.""" + def pformat(self): + return pformat_timeval(socket.ntohl(self.tv_sec), socket.ntohl(self.tv_usec)) + +# typedef struct _ZUnique_Id_t { +class ZUnique_Id_t(Structure): + _fields_ = [ + # struct in_addr zuid_addr; + ("zuid_addr", in_addr), + # struct _ZTimeval tv; + ("tv", _ZTimeval_Net), + # } ZUnique_Id_t; + ] + +# union { +class _U_z_sender_sockaddr(Union): + _fields_ = [ + # struct sockaddr sa; + ("sa", sockaddr), + # struct sockaddr_in ip4; + ("ip4", sockaddr_in), + # struct sockaddr_in6 ip6; + ("ip6", sockaddr_in6), + # } z_sender_sockaddr; + ] + def pprint(self, indent): + print + if self.sa.sa_family.value == socket.AF_INET: + ctypes_pprint(self.ip4, indent + ".ip4:") + elif self.sa.sa_family.value == socket.AF_INET6: + ctypes_pprint(self.ip6, indent + ".ip6:") + else: + ctypes_pprint(self.sa, indent + ".sa:") + +# typedef struct _ZNotice_t { +class ZNotice_t(Structure): + _fields_ = [ + # char *z_packet; + ("z_packet", c_char_p), + # char *z_version; + ("z_version", c_char_p), + # ZNotice_Kind_t z_kind; + ("z_kind", ZNotice_Kind_t), + # ZUnique_Id_t z_uid; + ("z_uid", ZUnique_Id_t), + # union { + # struct sockaddr sa; + # struct sockaddr_in ip4; + # struct sockaddr_in6 ip6; + # } z_sender_sockaddr; + ("z_sender_sockaddr", _U_z_sender_sockaddr), + + # /* heavily deprecated: */ + # #define z_sender_addr z_sender_sockaddr.ip4.sin_addr + # /* probably a bad idea?: */ + # struct _ZTimeval z_time; + ("z_time", _ZTimeval), + # unsigned short z_port; + ("z_port", c_ushort), + # unsigned short z_charset; + ("z_charset", c_ushort), + # int z_auth; + ("z_auth", c_int), + # int z_checked_auth; + # TODO: fake enum, for display + ("z_checked_auth", c_int), + # int z_authent_len; + ("z_authent_len", c_int), + # char *z_ascii_authent; + ("z_ascii_authent", c_char_p), + # char *z_class; + ("z_class", c_char_p), + # char *z_class_inst; + ("z_class_inst", c_char_p), + # char *z_opcode; + ("z_opcode", c_char_p), + # char *z_sender; + ("z_sender", c_char_p), + # char *z_recipient; + ("z_recipient", c_char_p), + # char *z_default_format; + ("z_default_format", c_char_p), + # char *z_multinotice; + ("z_multinotice", c_char_p), + # ZUnique_Id_t z_multiuid; + ("z_multiuid", ZUnique_Id_t), + # ZChecksum_t z_checksum; + ("z_checksum", c_uint), + # char *z_ascii_checksum; + ("z_ascii_checksum", c_char_p), + # int z_num_other_fields; + ("z_num_other_fields", c_int), + # char *z_other_fields[Z_MAXOTHERFIELDS]; + ("z_other_fields", c_char_p * Z_MAXOTHERFIELDS), + # caddr_t z_message; + ("z_message", c_char_p), # not 1980 + # int z_message_len; + ("z_message_len", c_int), + # int z_num_hdr_fields; + ("z_num_hdr_fields", c_int), + # char **z_hdr_fields; + ("z_hdr_fields", POINTER(c_char_p)), + # } ZNotice_t; + ] + def pformat_z_other_fields(self): + return ["%d: %s" % (n, self.z_other_fields[n]) + for n in range(Z_MAXOTHERFIELDS)] + def pformat_z_hdr_fields(self): + if not self.z_hdr_fields: + return ["NULL"] + return ["%d: %s" % (n, self.z_hdr_fields[n]) + for n in range(self.z_num_hdr_fields)] + +class libZephyr(object): + """wrappers for functions in libZephyr""" + testable_funcs = [ + "ZInitialize", + "ZGetFD", + "ZGetRealm", + "ZGetSender", + "Z_FormatRawHeader", + "ZParseNotice", + "ZFormatNotice", + "ZCompareUID", + "ZExpandRealm", + "ZGetCharsetString", + "ZGetCharset", + "ZCharsetToString", + "ZTransliterate", + "ZOpenPort", + "ZClosePort", + "ZMakeAscii", + "ZMakeZcode", + "ZGetDestAddr", + "ZSetFD", + "ZPending", + ] + def __init__(self, library_path=None): + """connect to the library and build the wrappers""" + if not library_path: + library_path = ctypes.util.find_library("zephyr") + self._lib = ctypes.cdll.LoadLibrary(library_path) + + # grab the Zauthtype variable + self.Zauthtype = ctypes.c_int.in_dll(self._lib, 'Zauthtype').value + + # generic bindings? + for funcname in self.testable_funcs: + setattr(self, funcname, getattr(self._lib, funcname)) + + # TODO: fix return types, caller types in a more generic way later + # (perhaps by parsing the headers or code) + # perhaps metaprogramming or decorators... + self.ZGetRealm.restype = ctypes.c_char_p + self.ZGetSender.restype = ctypes.c_char_p + + # Code_t + # Z_FormatRawHeader(ZNotice_t *notice, + # char *buffer, + # int buffer_len, + # int *len, + # char **cstart, + # char **cend) + # This stuffs a notice into a buffer; cstart/cend point into the checksum in buffer + self.Z_FormatRawHeader.argtypes = [ + c_void_p, # *notice + c_char_p, # *buffer + c_int, # buffer_len + POINTER(c_int), # *len + POINTER(c_char_p), # **cstart + POINTER(c_char_p), # **cend + ] + + # Code_t + # ZParseNotice(char *buffer, + # int len, + # ZNotice_t *notice) + self.ZParseNotice.argtypes = [ + c_char_p, # *buffer + c_int, # len + POINTER(ZNotice_t), # *notice + ] + + # Code_t + # ZFormatNotice(register ZNotice_t *notice, + # char **buffer, + # int *ret_len, + # Z_AuthProc cert_routine) + self.ZFormatNotice.argtypes = [ + POINTER(ZNotice_t), # *notice + POINTER(c_char_p), # **buffer + POINTER(c_int), # *ret_len + c_void_p, # cert_routine + ] + + # int + # ZCompareUID(ZUnique_Id_t *uid1, + # ZUnique_Id_t *uid2) + self.ZCompareUID.argtypes = [ + POINTER(ZUnique_Id_t), # *uid1 + POINTER(ZUnique_Id_t), # *uid2 + ] + + # char * + # ZExpandRealm(realm) + # char *realm; # mmm 80's + self.ZExpandRealm.restype = c_char_p + self.ZExpandRealm.argtypes = [ + c_char_p, # realm + ] + + # unsigned short + # ZGetCharset(char *charset) + self.ZGetCharset.restype = c_ushort + self.ZGetCharset.argtypes = [ + c_char_p, # charset + ] + + # const char * + # ZCharsetToString(unsigned short charset) + self.ZCharsetToString.restype = c_char_p + self.ZCharsetToString.argtypes = [ + c_ushort, # charset + ] + + # Code_t + # ZTransliterate(char *in, + # int inlen, + # char *inset, + # char *outset, + # char **out, + # int *outlen) + self.ZTransliterate.argtypes = [ + c_char_p, # in + c_int, # inlnet, + c_char_p, # inset + c_char_p, # outset + POINTER(c_char_p), # out + POINTER(c_int), # outlen + ] + + # Code_t ZOpenPort(u_short *port) + self.ZOpenPort.argtypes = [ + POINTER(c_ushort), # port + ] + + # const char * + # ZGetCharsetString(char *charset) + self.ZGetCharsetString.restype = c_char_p + self.ZGetCharsetString.argtypes = [ + c_char_p, # charset + ] + + # Code_t + # ZMakeAscii(register char *ptr, + # int len, + # unsigned char *field, + # int num) + self.ZMakeAscii.argtypes = [ + c_char_p, # ptr + c_int, # len + c_char_p, # field; c_uchar_p? + c_int, # num + ] + + # Code_t + # ZMakeZcode(register char *ptr, + # int len, + # unsigned char *field, + # int num) + self.ZMakeZcode.argtypes = [ + c_char_p, # ptr + c_int, # len + c_char_p, # field; c_uchar_p? + c_int, # num + ] + + # struct sockaddr_in ZGetDestAddr (void) { + self.ZGetDestAddr.restype = sockaddr_in + + # library-specific setup... + self.ZInitialize() -- cgit v1.2.3