summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorGravatar Karl Ramm <kcr@1ts.org>2009-07-15 14:34:02 +0000
committerGravatar Karl Ramm <kcr@1ts.org>2009-07-15 14:34:02 +0000
commitd54bda3667ccb164fbfb37bf7b4d7aa3fea2471f (patch)
tree542a5ac3259a7991faa4747c411ff51dd7d68c08 /lib
parent827e165e5b80d999a8a30b3292f9bbb4a8c772e1 (diff)
Factor the python library stuff out of the test suite and stick
it in it's own file. And then put it in a debian package.
Diffstat (limited to 'lib')
-rw-r--r--lib/Makefile.in4
-rwxr-xr-xlib/zephyr_tests.py448
2 files changed, 5 insertions, 447 deletions
diff --git a/lib/Makefile.in b/lib/Makefile.in
index 7541917..cbdb9e3 100644
--- a/lib/Makefile.in
+++ b/lib/Makefile.in
@@ -59,8 +59,8 @@ zephyr_err.c ${BUILDTOP}/h/zephyr/zephyr_err.h: zephyr_err.et
${LIBTOOL} --mode=compile ${CC} -c -o $@ ${ALL_CFLAGS} $<
check:
- python $(srcdir)/zephyr_tests.py --builddir=$(BUILDTOP)
- $(srcdir)/zephyr_run_doctests --builddir=$(BUILDTOP)
+ PYTHONPATH=${top_srcdir}/python python $(srcdir)/zephyr_tests.py --builddir=$(BUILDTOP)
+ PYTHONPATH=${top_srcdir}/python $(srcdir)/zephyr_run_doctests --builddir=$(BUILDTOP)
install: libzephyr.la
${LIBTOOL} --mode=install ${INSTALL} -m 644 libzephyr.la \
diff --git a/lib/zephyr_tests.py b/lib/zephyr_tests.py
index 82983e9..dc2000f 100755
--- a/lib/zephyr_tests.py
+++ b/lib/zephyr_tests.py
@@ -11,14 +11,12 @@
import optparse
import os
import socket
-import struct
import ctypes
import ctypes.util
import time
-from ctypes import c_int, c_uint, c_ushort, c_char, c_ubyte
-from ctypes import c_uint16, c_uint32
-from ctypes import POINTER, c_void_p, c_char_p
-from ctypes import Structure, Union, sizeof
+from ctypes import c_int, c_char, POINTER, c_char_p, sizeof
+
+from zephyr import *
__revision__ = "$Id$"
try:
@@ -26,65 +24,6 @@ try:
except IndexError:
__version__ = "unknown"
-def print_line_or_lines(results, indent):
- """short values on same line, multi-line on later ones..."""
- if len(results) == 1:
- print results[0]
- else:
- print
- for result in results:
- print indent + result
-
-def ctypes_pprint(cstruct, indent=""):
- """pretty print a ctypes Structure or Union"""
-
- for field_name, field_ctype in cstruct._fields_:
- field_value = getattr(cstruct, field_name)
- print indent + field_name,
- next_indent = indent + " "
- pprint_name = "pprint_%s" % field_name
- pformat_name = "pformat_%s" % field_name
- if hasattr(cstruct, pprint_name):
- # no longer used
- getattr(cstruct, pprint_name)(next_indent)
- elif hasattr(cstruct, pformat_name):
- # counted-array and other common cases
- print_line_or_lines(getattr(cstruct, pformat_name)(), next_indent)
- elif hasattr(field_value, "pformat"):
- # common for small compound types
- print_line_or_lines(field_value.pformat(), next_indent)
- elif hasattr(field_value, "pprint"):
- # useful for Union selectors
- field_value.pprint(next_indent)
- elif hasattr(field_value, "_fields_"):
- # generic recursion
- print
- ctypes_pprint(field_value, next_indent)
- else:
- # generic simple (or unknown/uninteresting) value
- print field_value
-
-class Enum(c_int):
- def pformat(self):
- try:
- return ["%s(%d)" % (self._values_[self.value], self.value)]
- except IndexError:
- return ["unknown enum value(%d)" % (self.value)]
-
-def populate_enum(cls):
- """make members for each of the enum values"""
- for value, tag in enumerate(cls._values_):
- setattr(cls, tag, cls(value))
-
-# not really an enum, but we get a richer effect by treating it as one
-class Enum_u16(c_uint16):
- def pformat(self):
- try:
- return ["%s(%d)" % (self._values_[self.value], self.value)]
- except IndexError:
- return ["unknown enum value(%d)" % (self.value)]
-
-
# TODO: pick some real framework later, we're just poking around for now
class TestSuite(object):
"""test collection and runner"""
@@ -111,387 +50,6 @@ class TestSuite(object):
class TestFailure(Exception):
pass
-# POSIX socket types...
-class in_addr(Structure):
- _fields_ = [
- ("s_addr", c_uint32),
- ]
- def pformat(self):
- return [socket.inet_ntoa(struct.pack("<I", self.s_addr))]
-
-class _U_in6_u(Union):
- _fields_ = [
- ("u6_addr8", c_ubyte * 16),
- ("u6_addr16", c_uint16 * 8),
- ("u6_addr32", c_uint32 * 4),
- ]
-
-class in6_addr(Structure):
- _fields_ = [
- ("in6_u", _U_in6_u),
- ]
-
-class AF_(Enum_u16):
- _socket_af = dict([(v,n) for n,v in socket.__dict__.items() if n.startswith("AF_")])
- _values_ = [_socket_af.get(k, "unknown address family") for k in range(min(_socket_af), max(_socket_af)+1)]
-
-populate_enum(AF_)
-
-class sockaddr(Structure):
- _fields_ = [
- ("sa_family", AF_),
- ("sa_data", c_char * 14),
- ]
-
-class sockaddr_in(Structure):
- _fields_ = [
- ("sin_family", AF_),
- ("sin_port", c_uint16),
- ("sin_addr", in_addr),
- # hack from linux - do we actually need it?
- ("sin_zero", c_ubyte * (sizeof(sockaddr)-sizeof(c_uint16)-sizeof(c_uint16)-sizeof(in_addr))),
- ]
- def pformat_sin_zero(self):
- return ["[ignored]"]
-
-# RFC2553...
-class sockaddr_in6(Structure):
- _fields_ = [
- ("sin6_family", AF_),
- ("sin6_port", c_uint16),
- ("sin6_flowinfo", c_uint32),
- ("sin6_addr", in6_addr),
- ("sin6_scope_id", c_uint32),
- ]
-
-# zephyr/zephyr.h
-#define Z_MAXOTHERFIELDS 10 /* Max unknown fields in ZNotice_t */
-Z_MAXOTHERFIELDS = 10
-#define ZAUTH (ZMakeAuthentication)
-#define ZCAUTH (ZMakeZcodeAuthentication)
-#define ZNOAUTH ((Z_AuthProc)0)
-ZNOAUTH = 0
-
-# typedef enum {
-# UNSAFE, UNACKED, ACKED, HMACK, HMCTL, SERVACK, SERVNAK, CLIENTACK, STAT
-# } ZNotice_Kind_t;
-# extern const char *ZNoticeKinds[9];
-
-class ZNotice_Kind_t(Enum):
- _values_ = [
- "UNSAFE", "UNACKED", "ACKED", "HMACK", "HMCTL", "SERVACK", "SERVNAK", "CLIENTACK", "STAT",
- ]
-populate_enum(ZNotice_Kind_t)
-
-def pformat_timeval(tv_sec, tv_usec):
- """format timeval parts as seconds and human-readable time"""
- try:
- timestr = time.ctime(tv_sec)
- except ValueError:
- timestr = "invalid unix time"
- if tv_usec >= 1000000 or tv_usec < 0:
- # invalid usec, still treat as numbers
- return ["%dsec, %dusec (bad) (%s)" % (tv_sec, tv_usec, timestr)]
- return ["%d.%06dsec (%s)" % (tv_sec, tv_usec, timestr)]
-
-# struct _ZTimeval {
-class _ZTimeval(Structure):
- _fields_ = [
-# int tv_sec;
- ("tv_sec", c_int),
-# int tv_usec;
- ("tv_usec", c_int),
-# };
- ]
- def pformat(self):
- return pformat_timeval(self.tv_sec, self.tv_usec)
-
-
-class _ZTimeval_Net(_ZTimeval):
- """When _ZTimeval is used in a ZUnique_Id_t, the time parts are
- stored in network byte order. Handle this by faking up a different type."""
- def pformat(self):
- return pformat_timeval(socket.ntohl(self.tv_sec), socket.ntohl(self.tv_usec))
-
-# typedef struct _ZUnique_Id_t {
-class ZUnique_Id_t(Structure):
- _fields_ = [
- # struct in_addr zuid_addr;
- ("zuid_addr", in_addr),
- # struct _ZTimeval tv;
- ("tv", _ZTimeval_Net),
- # } ZUnique_Id_t;
- ]
-
-# union {
-class _U_z_sender_sockaddr(Union):
- _fields_ = [
- # struct sockaddr sa;
- ("sa", sockaddr),
- # struct sockaddr_in ip4;
- ("ip4", sockaddr_in),
- # struct sockaddr_in6 ip6;
- ("ip6", sockaddr_in6),
- # } z_sender_sockaddr;
- ]
- def pprint(self, indent):
- print
- if self.sa.sa_family.value == socket.AF_INET:
- ctypes_pprint(self.ip4, indent + ".ip4:")
- elif self.sa.sa_family.value == socket.AF_INET6:
- ctypes_pprint(self.ip6, indent + ".ip6:")
- else:
- ctypes_pprint(self.sa, indent + ".sa:")
-
-# typedef struct _ZNotice_t {
-class ZNotice_t(Structure):
- _fields_ = [
- # char *z_packet;
- ("z_packet", c_char_p),
- # char *z_version;
- ("z_version", c_char_p),
- # ZNotice_Kind_t z_kind;
- ("z_kind", ZNotice_Kind_t),
- # ZUnique_Id_t z_uid;
- ("z_uid", ZUnique_Id_t),
- # union {
- # struct sockaddr sa;
- # struct sockaddr_in ip4;
- # struct sockaddr_in6 ip6;
- # } z_sender_sockaddr;
- ("z_sender_sockaddr", _U_z_sender_sockaddr),
-
- # /* heavily deprecated: */
- # #define z_sender_addr z_sender_sockaddr.ip4.sin_addr
- # /* probably a bad idea?: */
- # struct _ZTimeval z_time;
- ("z_time", _ZTimeval),
- # unsigned short z_port;
- ("z_port", c_ushort),
- # unsigned short z_charset;
- ("z_charset", c_ushort),
- # int z_auth;
- ("z_auth", c_int),
- # int z_checked_auth;
- # TODO: fake enum, for display
- ("z_checked_auth", c_int),
- # int z_authent_len;
- ("z_authent_len", c_int),
- # char *z_ascii_authent;
- ("z_ascii_authent", c_char_p),
- # char *z_class;
- ("z_class", c_char_p),
- # char *z_class_inst;
- ("z_class_inst", c_char_p),
- # char *z_opcode;
- ("z_opcode", c_char_p),
- # char *z_sender;
- ("z_sender", c_char_p),
- # char *z_recipient;
- ("z_recipient", c_char_p),
- # char *z_default_format;
- ("z_default_format", c_char_p),
- # char *z_multinotice;
- ("z_multinotice", c_char_p),
- # ZUnique_Id_t z_multiuid;
- ("z_multiuid", ZUnique_Id_t),
- # ZChecksum_t z_checksum;
- ("z_checksum", c_uint),
- # char *z_ascii_checksum;
- ("z_ascii_checksum", c_char_p),
- # int z_num_other_fields;
- ("z_num_other_fields", c_int),
- # char *z_other_fields[Z_MAXOTHERFIELDS];
- ("z_other_fields", c_char_p * Z_MAXOTHERFIELDS),
- # caddr_t z_message;
- ("z_message", c_char_p), # not 1980
- # int z_message_len;
- ("z_message_len", c_int),
- # int z_num_hdr_fields;
- ("z_num_hdr_fields", c_int),
- # char **z_hdr_fields;
- ("z_hdr_fields", POINTER(c_char_p)),
- # } ZNotice_t;
- ]
- def pformat_z_other_fields(self):
- return ["%d: %s" % (n, self.z_other_fields[n])
- for n in range(Z_MAXOTHERFIELDS)]
- def pformat_z_hdr_fields(self):
- if not self.z_hdr_fields:
- return ["NULL"]
- return ["%d: %s" % (n, self.z_hdr_fields[n])
- for n in range(self.z_num_hdr_fields)]
-
-class libZephyr(object):
- """wrappers for functions in libZephyr"""
- testable_funcs = [
- "ZInitialize",
- "ZGetFD",
- "ZGetRealm",
- "ZGetSender",
- "Z_FormatRawHeader",
- "ZParseNotice",
- "ZFormatNotice",
- "ZCompareUID",
- "ZExpandRealm",
- "ZGetCharsetString",
- "ZGetCharset",
- "ZCharsetToString",
- "ZTransliterate",
- "ZOpenPort",
- "ZClosePort",
- "ZMakeAscii",
- "ZMakeZcode",
- "ZGetDestAddr",
- "ZSetFD",
- "ZPending",
- ]
- def __init__(self, library_path=None):
- """connect to the library and build the wrappers"""
- if not library_path:
- library_path = ctypes.util.find_library("zephyr")
- self._lib = ctypes.cdll.LoadLibrary(library_path)
-
- # grab the Zauthtype variable
- self.Zauthtype = ctypes.c_int.in_dll(self._lib, 'Zauthtype').value
-
- # generic bindings?
- for funcname in self.testable_funcs:
- setattr(self, funcname, getattr(self._lib, funcname))
-
- # TODO: fix return types, caller types in a more generic way later
- # (perhaps by parsing the headers or code)
- # perhaps metaprogramming or decorators...
- self.ZGetRealm.restype = ctypes.c_char_p
- self.ZGetSender.restype = ctypes.c_char_p
-
- # Code_t
- # Z_FormatRawHeader(ZNotice_t *notice,
- # char *buffer,
- # int buffer_len,
- # int *len,
- # char **cstart,
- # char **cend)
- # This stuffs a notice into a buffer; cstart/cend point into the checksum in buffer
- self.Z_FormatRawHeader.argtypes = [
- c_void_p, # *notice
- c_char_p, # *buffer
- c_int, # buffer_len
- POINTER(c_int), # *len
- POINTER(c_char_p), # **cstart
- POINTER(c_char_p), # **cend
- ]
-
- # Code_t
- # ZParseNotice(char *buffer,
- # int len,
- # ZNotice_t *notice)
- self.ZParseNotice.argtypes = [
- c_char_p, # *buffer
- c_int, # len
- POINTER(ZNotice_t), # *notice
- ]
-
- # Code_t
- # ZFormatNotice(register ZNotice_t *notice,
- # char **buffer,
- # int *ret_len,
- # Z_AuthProc cert_routine)
- self.ZFormatNotice.argtypes = [
- POINTER(ZNotice_t), # *notice
- POINTER(c_char_p), # **buffer
- POINTER(c_int), # *ret_len
- c_void_p, # cert_routine
- ]
-
- # int
- # ZCompareUID(ZUnique_Id_t *uid1,
- # ZUnique_Id_t *uid2)
- self.ZCompareUID.argtypes = [
- POINTER(ZUnique_Id_t), # *uid1
- POINTER(ZUnique_Id_t), # *uid2
- ]
-
- # char *
- # ZExpandRealm(realm)
- # char *realm; # mmm 80's
- self.ZExpandRealm.restype = c_char_p
- self.ZExpandRealm.argtypes = [
- c_char_p, # realm
- ]
-
- # unsigned short
- # ZGetCharset(char *charset)
- self.ZGetCharset.restype = c_ushort
- self.ZGetCharset.argtypes = [
- c_char_p, # charset
- ]
-
- # const char *
- # ZCharsetToString(unsigned short charset)
- self.ZCharsetToString.restype = c_char_p
- self.ZCharsetToString.argtypes = [
- c_ushort, # charset
- ]
-
- # Code_t
- # ZTransliterate(char *in,
- # int inlen,
- # char *inset,
- # char *outset,
- # char **out,
- # int *outlen)
- self.ZTransliterate.argtypes = [
- c_char_p, # in
- c_int, # inlnet,
- c_char_p, # inset
- c_char_p, # outset
- POINTER(c_char_p), # out
- POINTER(c_int), # outlen
- ]
-
- # Code_t ZOpenPort(u_short *port)
- self.ZOpenPort.argtypes = [
- POINTER(c_ushort), # port
- ]
-
- # const char *
- # ZGetCharsetString(char *charset)
- self.ZGetCharsetString.restype = c_char_p
- self.ZGetCharsetString.argtypes = [
- c_char_p, # charset
- ]
-
- # Code_t
- # ZMakeAscii(register char *ptr,
- # int len,
- # unsigned char *field,
- # int num)
- self.ZMakeAscii.argtypes = [
- c_char_p, # ptr
- c_int, # len
- c_char_p, # field; c_uchar_p?
- c_int, # num
- ]
-
- # Code_t
- # ZMakeZcode(register char *ptr,
- # int len,
- # unsigned char *field,
- # int num)
- self.ZMakeZcode.argtypes = [
- c_char_p, # ptr
- c_int, # len
- c_char_p, # field; c_uchar_p?
- c_int, # num
- ]
-
- # struct sockaddr_in ZGetDestAddr (void) {
- self.ZGetDestAddr.restype = sockaddr_in
-
- # library-specific setup...
- self.ZInitialize()
-
def py_make_ascii(input):
"""reference ZMakeAscii expressed as python..."""
hexes = ["%02X" % ord(ch) for ch in input]