#!/usr/bin/python """rough bindings for libzephyr""" import socket import struct import ctypes import ctypes.util import time import sys from ctypes import c_int, c_uint, c_ushort, c_char, c_ubyte from ctypes import c_uint16, c_uint32 from ctypes import POINTER, c_void_p, c_char_p from ctypes import Structure, Union, sizeof __revision__ = "$Id$" try: __version__ = "%s/%s" % (__revision__.split()[3], __revision__.split()[2]) except IndexError: __version__ = "unknown" def print_line_or_lines(results, indent): """short values on same line, multi-line on later ones...""" if len(results) == 1: print results[0] else: print for result in results: print indent + result def ctypes_pprint(cstruct, indent=""): """pretty print a ctypes Structure or Union""" for field_name, field_ctype in cstruct._fields_: field_value = getattr(cstruct, field_name) print indent + field_name, next_indent = indent + " " pprint_name = "pprint_%s" % field_name pformat_name = "pformat_%s" % field_name if hasattr(cstruct, pprint_name): # no longer used getattr(cstruct, pprint_name)(next_indent) elif hasattr(cstruct, pformat_name): # counted-array and other common cases print_line_or_lines(getattr(cstruct, pformat_name)(), next_indent) elif hasattr(field_value, "pformat"): # common for small compound types print_line_or_lines(field_value.pformat(), next_indent) elif hasattr(field_value, "pprint"): # useful for Union selectors field_value.pprint(next_indent) elif hasattr(field_value, "_fields_"): # generic recursion print ctypes_pprint(field_value, next_indent) else: # generic simple (or unknown/uninteresting) value print field_value class Enum(c_int): def pformat(self): try: return ["%s(%d)" % (self._values_[self.value], self.value)] except IndexError: return ["unknown enum value(%d)" % (self.value)] def populate_enum(cls): """make members for each of the enum values""" for value, tag in enumerate(cls._values_): setattr(cls, tag, cls(value)) # not really an enum, but we get a richer effect by treating it as one class Enum_u16(c_uint16): def pformat(self): try: return ["%s(%d)" % (self._values_[self.value], self.value)] except IndexError: return ["unknown enum value(%d)" % (self.value)] class Enum_u8(c_ubyte): def pformat(self): try: return ["%s(%d)" % (self._values_[self.value], self.value)] except IndexError: return ["unknown enum value(%d)" % (self.value)] # POSIX socket types... class in_addr(Structure): _fields_ = [ ("s_addr", c_uint32), ] def pformat(self): return [socket.inet_ntoa(struct.pack("= 0: sa_has_len = True elif sys.platform.startswith("linux"): sa_has_len = False else: # ?? sa_has_len = False if sa_has_len: af_base_type = Enum_u8 else: af_base_type = Enum_u16 class AF_(af_base_type): _socket_af = dict([(v,n) for n,v in socket.__dict__.items() if n.startswith("AF_")]) _values_ = [_socket_af.get(k, "unknown address family") for k in range(min(_socket_af), max(_socket_af)+1)] populate_enum(AF_) def maybe_add_len_field(len_name, other_fields): if sa_has_len: return [(len_name, c_ubyte)] + other_fields else: return other_fields class sockaddr(Structure): _fields_ = maybe_add_len_field("sa_len", [ ("sa_family", AF_), ("sa_data", c_char * 14), ]) class sockaddr_in(Structure): _fields_ = maybe_add_len_field("sin_len", [ ("sin_family", AF_), ("sin_port", c_uint16), ("sin_addr", in_addr), # hack from linux - do we actually need it? ("sin_zero", c_ubyte * (sizeof(sockaddr)-sizeof(c_uint16)-sizeof(c_uint16)-sizeof(in_addr))), ]) def pformat_sin_zero(self): return ["[ignored]"] # RFC2553... class sockaddr_in6(Structure): _fields_ = maybe_add_len_field("sin6_len", [ ("sin6_family", AF_), ("sin6_port", c_uint16), ("sin6_flowinfo", c_uint32), ("sin6_addr", in6_addr), ("sin6_scope_id", c_uint32), ]) # zephyr/zephyr.h #define Z_MAXOTHERFIELDS 10 /* Max unknown fields in ZNotice_t */ Z_MAXOTHERFIELDS = 10 #define ZAUTH (ZMakeAuthentication) #define ZCAUTH (ZMakeZcodeAuthentication) #define ZNOAUTH ((Z_AuthProc)0) ZNOAUTH = 0 # typedef enum { # UNSAFE, UNACKED, ACKED, HMACK, HMCTL, SERVACK, SERVNAK, CLIENTACK, STAT # } ZNotice_Kind_t; # extern const char *ZNoticeKinds[9]; class ZNotice_Kind_t(Enum): _values_ = [ "UNSAFE", "UNACKED", "ACKED", "HMACK", "HMCTL", "SERVACK", "SERVNAK", "CLIENTACK", "STAT", ] populate_enum(ZNotice_Kind_t) def pformat_timeval(tv_sec, tv_usec): """format timeval parts as seconds and human-readable time""" try: timestr = time.ctime(tv_sec) except ValueError: timestr = "invalid unix time" if tv_usec >= 1000000 or tv_usec < 0: # invalid usec, still treat as numbers return ["%dsec, %dusec (bad) (%s)" % (tv_sec, tv_usec, timestr)] return ["%d.%06dsec (%s)" % (tv_sec, tv_usec, timestr)] # struct _ZTimeval { class _ZTimeval(Structure): _fields_ = [ # int tv_sec; ("tv_sec", c_int), # int tv_usec; ("tv_usec", c_int), # }; ] def pformat(self): return pformat_timeval(self.tv_sec, self.tv_usec) class _ZTimeval_Net(_ZTimeval): """When _ZTimeval is used in a ZUnique_Id_t, the time parts are stored in network byte order. Handle this by faking up a different type.""" def pformat(self): return pformat_timeval(socket.ntohl(self.tv_sec & 0xffffffff), socket.ntohl(self.tv_usec & 0xffffffff)) # typedef struct _ZUnique_Id_t { class ZUnique_Id_t(Structure): _fields_ = [ # struct in_addr zuid_addr; ("zuid_addr", in_addr), # struct _ZTimeval tv; ("tv", _ZTimeval_Net), # } ZUnique_Id_t; ] # union { class _U_z_sender_sockaddr(Union): _fields_ = [ # struct sockaddr sa; ("sa", sockaddr), # struct sockaddr_in ip4; ("ip4", sockaddr_in), # struct sockaddr_in6 ip6; ("ip6", sockaddr_in6), # } z_sender_sockaddr; ] def pprint(self, indent): print if self.sa.sa_family.value == socket.AF_INET: ctypes_pprint(self.ip4, indent + ".ip4:") elif self.sa.sa_family.value == socket.AF_INET6: ctypes_pprint(self.ip6, indent + ".ip6:") else: ctypes_pprint(self.sa, indent + ".sa:") # typedef struct _ZNotice_t { class ZNotice_t(Structure): _fields_ = [ # char *z_packet; ("z_packet", c_char_p), # char *z_version; ("z_version", c_char_p), # ZNotice_Kind_t z_kind; ("z_kind", ZNotice_Kind_t), # ZUnique_Id_t z_uid; ("z_uid", ZUnique_Id_t), # union { # struct sockaddr sa; # struct sockaddr_in ip4; # struct sockaddr_in6 ip6; # } z_sender_sockaddr; ("z_sender_sockaddr", _U_z_sender_sockaddr), # /* heavily deprecated: */ # #define z_sender_addr z_sender_sockaddr.ip4.sin_addr # /* probably a bad idea?: */ # struct _ZTimeval z_time; ("z_time", _ZTimeval), # unsigned short z_port; ("z_port", c_ushort), # unsigned short z_charset; ("z_charset", c_ushort), # int z_auth; ("z_auth", c_int), # int z_checked_auth; # TODO: fake enum, for display ("z_checked_auth", c_int), # int z_authent_len; ("z_authent_len", c_int), # char *z_ascii_authent; ("z_ascii_authent", c_char_p), # char *z_class; ("z_class", c_char_p), # char *z_class_inst; ("z_class_inst", c_char_p), # char *z_opcode; ("z_opcode", c_char_p), # char *z_sender; ("z_sender", c_char_p), # char *z_recipient; ("z_recipient", c_char_p), # char *z_default_format; ("z_default_format", c_char_p), # char *z_multinotice; ("z_multinotice", c_char_p), # ZUnique_Id_t z_multiuid; ("z_multiuid", ZUnique_Id_t), # ZChecksum_t z_checksum; ("z_checksum", c_uint), # char *z_ascii_checksum; ("z_ascii_checksum", c_char_p), # int z_num_other_fields; ("z_num_other_fields", c_int), # char *z_other_fields[Z_MAXOTHERFIELDS]; ("z_other_fields", c_char_p * Z_MAXOTHERFIELDS), # caddr_t z_message; ("z_message", c_char_p), # not 1980 # int z_message_len; ("z_message_len", c_int), # int z_num_hdr_fields; ("z_num_hdr_fields", c_int), # char **z_hdr_fields; ("z_hdr_fields", POINTER(c_char_p)), # } ZNotice_t; ] def pformat_z_other_fields(self): return ["%d: %s" % (n, self.z_other_fields[n]) for n in range(Z_MAXOTHERFIELDS)] def pformat_z_hdr_fields(self): if not self.z_hdr_fields: return ["NULL"] return ["%d: %s" % (n, self.z_hdr_fields[n]) for n in range(self.z_num_hdr_fields)] class libZephyr(object): """wrappers for functions in libZephyr""" testable_funcs = [ "ZInitialize", "ZGetFD", "ZGetRealm", "ZGetSender", "Z_FormatRawHeader", "ZParseNotice", "ZFormatNotice", "ZCompareUID", "ZExpandRealm", "ZGetCharsetString", "ZGetCharset", "ZCharsetToString", "ZTransliterate", "ZOpenPort", "ZClosePort", "ZMakeAscii", "ZMakeZcode", "ZGetDestAddr", "ZSetFD", "ZPending", ] def __init__(self, library_path=None): """connect to the library and build the wrappers""" if not library_path: library_path = ctypes.util.find_library("zephyr") self._lib = ctypes.cdll.LoadLibrary(library_path) # grab the Zauthtype variable self.Zauthtype = ctypes.c_int.in_dll(self._lib, 'Zauthtype').value # generic bindings? for funcname in self.testable_funcs: setattr(self, funcname, getattr(self._lib, funcname)) # TODO: fix return types, caller types in a more generic way later # (perhaps by parsing the headers or code) # perhaps metaprogramming or decorators... self.ZGetRealm.restype = ctypes.c_char_p self.ZGetSender.restype = ctypes.c_char_p # Code_t # Z_FormatRawHeader(ZNotice_t *notice, # char *buffer, # int buffer_len, # int *len, # char **cstart, # char **cend) # This stuffs a notice into a buffer; cstart/cend point into the checksum in buffer self.Z_FormatRawHeader.argtypes = [ c_void_p, # *notice c_char_p, # *buffer c_int, # buffer_len POINTER(c_int), # *len POINTER(c_char_p), # **cstart POINTER(c_char_p), # **cend ] # Code_t # ZParseNotice(char *buffer, # int len, # ZNotice_t *notice) self.ZParseNotice.argtypes = [ c_char_p, # *buffer c_int, # len POINTER(ZNotice_t), # *notice ] # Code_t # ZFormatNotice(register ZNotice_t *notice, # char **buffer, # int *ret_len, # Z_AuthProc cert_routine) self.ZFormatNotice.argtypes = [ POINTER(ZNotice_t), # *notice POINTER(c_char_p), # **buffer POINTER(c_int), # *ret_len c_void_p, # cert_routine ] # int # ZCompareUID(ZUnique_Id_t *uid1, # ZUnique_Id_t *uid2) self.ZCompareUID.argtypes = [ POINTER(ZUnique_Id_t), # *uid1 POINTER(ZUnique_Id_t), # *uid2 ] # char * # ZExpandRealm(realm) # char *realm; # mmm 80's self.ZExpandRealm.restype = c_char_p self.ZExpandRealm.argtypes = [ c_char_p, # realm ] # unsigned short # ZGetCharset(char *charset) self.ZGetCharset.restype = c_ushort self.ZGetCharset.argtypes = [ c_char_p, # charset ] # const char * # ZCharsetToString(unsigned short charset) self.ZCharsetToString.restype = c_char_p self.ZCharsetToString.argtypes = [ c_ushort, # charset ] # Code_t # ZTransliterate(char *in, # int inlen, # char *inset, # char *outset, # char **out, # int *outlen) self.ZTransliterate.argtypes = [ c_char_p, # in c_int, # inlnet, c_char_p, # inset c_char_p, # outset POINTER(c_char_p), # out POINTER(c_int), # outlen ] # Code_t ZOpenPort(u_short *port) self.ZOpenPort.argtypes = [ POINTER(c_ushort), # port ] # const char * # ZGetCharsetString(char *charset) self.ZGetCharsetString.restype = c_char_p self.ZGetCharsetString.argtypes = [ c_char_p, # charset ] # Code_t # ZMakeAscii(register char *ptr, # int len, # unsigned char *field, # int num) self.ZMakeAscii.argtypes = [ c_char_p, # ptr c_int, # len c_char_p, # field; c_uchar_p? c_int, # num ] # Code_t # ZMakeZcode(register char *ptr, # int len, # unsigned char *field, # int num) self.ZMakeZcode.argtypes = [ c_char_p, # ptr c_int, # len c_char_p, # field; c_uchar_p? c_int, # num ] # struct sockaddr_in ZGetDestAddr (void) { self.ZGetDestAddr.restype = sockaddr_in # library-specific setup... self.ZInitialize()