You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

320 lines
8.3 KiB

import socket
import struct
import ipaddress
from io import BytesIO
from datetime import datetime
from ctypes import (
c_ushort as _c_ushort,
c_ubyte as _c_ubyte,
c_uint8 as _c_uint8,
c_uint16 as _c_uint16,
c_uint32 as _c_uint32,
c_uint64 as _c_uint64,
c_short as _c_short,
c_byte as _c_byte,
c_int8 as _c_int8,
c_int16 as _c_int16,
c_int32 as _c_int32,
c_int64 as _c_int64,
)
from levin.exceptions import BadArgumentException
class _CType:
def __init__(self, value, endian=None):
self.value = value
self.endian = endian
@classmethod
def from_buffer(cls, buffer, endian: str = 'little', padding: bytes = None):
if isinstance(buffer, BytesIO):
buffer = buffer.read(cls.NBYTES)
elif isinstance(buffer, socket.socket):
buffer = buffer.recv(cls.NBYTES)
if len(buffer) < cls.NBYTES:
if not padding:
_bytes = 'bytes' if cls.NBYTES > 1 else 'byte'
raise BadArgumentException("requires a buffer of %d %s, received %d" % (cls.NBYTES, _bytes, len(buffer)))
func_padding = bytes.ljust if endian == 'little' else bytes.rjust
buffer = func_padding(buffer, cls.NBYTES, padding)
_endian = '<' if endian == 'little' else '>'
type_struct = cls.TYPE_STRUCT if hasattr(cls, 'TYPE_STRUCT') else cls.TYPE._type_
value = struct.unpack('%s%s' % (_endian, type_struct), buffer)[0]
return cls(value, endian)
def to_bytes(self):
if isinstance(self.value, (int, bool)):
type_struct = self.TYPE_STRUCT if hasattr(self, 'TYPE_STRUCT') else self.TYPE._type_
return struct.pack('%s%s' % ('<' if self.endian == 'little' else '>', type_struct), self.value)
elif isinstance(self.value, bytes):
if self.endian == 'little':
return self.value[::-1]
return self.value
def _overflows(self):
if isinstance(self.value, int):
try:
self.value.to_bytes(self.NBYTES, byteorder=self.endian, signed=self.SIGNED)
except OverflowError as e:
raise OverflowError("Value \'%d\' does not fit in %s." % (self.value, self.__class__.__name__))
def __len__(self):
if isinstance(self.value, bytes):
return len(self.value)
if isinstance(self.value, int):
return len(bytes(self))
return len(bytes(self.value))
def __eq__(self, other):
if isinstance(self.value, int):
return self.value == other
def __ne__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
return self.value != other
def __and__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
# Bitwise AND
return self.value & other
raise NotImplementedError()
def __lshift__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
# Bitwise Left Shift
return self.value << other
raise NotImplementedError()
def __rshift__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
# Bitwise Right Shift
return self.value >> other
raise NotImplementedError()
def __rlshift__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
# Bitwise Right Shift
return self.value << other
raise NotImplementedError()
def __rrshift__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
# Bitwise Left Shift
return self.value >> other
raise NotImplementedError()
def __or__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
# Bitwise OR
return self.value | other
raise NotImplementedError()
def __mod__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
return self.value % other
def __add__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
return self.value + other
def __radd__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
return self.value + other
def __gt__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
return self.value > other
def __lt__(self, other):
if isinstance(other, _CType):
other = other.value
if isinstance(self.value, int):
return self.value < other
def __bytes__(self):
return self.to_bytes()
def __hash__(self):
return self.value
def __int__(self):
return self.value
class _IntType(_CType):
def __init__(self, value, endian='little'):
super(_IntType, self).__init__(value, endian)
self._overflows()
def __repr__(self):
_signed = 'unsigned' if not self.SIGNED else 'signed'
_bytes = 'bytes' if self.NBYTES > 1 else 'byte'
_cls_name = self.__class__.__name__
return '\'%d\' - %s - %s %d %s' % (self.value, _cls_name, _signed, self.NBYTES, _bytes)
class c_int16(_IntType):
TYPE = _c_short
NBYTES = 2
SIGNED = True
def __init__(self, value, endian='little'):
super(c_int16, self).__init__(value, endian)
class c_uint16(_IntType):
TYPE = _c_ushort
NBYTES = 2
SIGNED = False
def __init__(self, value, endian='little'):
super(c_uint16, self).__init__(value, endian)
class c_int32(_IntType):
TYPE = _c_uint32
NBYTES = 4
SIGNED = True
def __init__(self, value, endian='little'):
super(c_int32, self).__init__(value, endian)
class c_uint32(_IntType):
TYPE = _c_uint32
NBYTES = 4
SIGNED = False
def __init__(self, value, endian='little'):
super(c_uint32, self).__init__(value, endian)
@property
def ipv4(self) -> str:
return str(ipaddress.IPv4Address(self.value))
@property
def ip(self) -> str:
return self.ipv4
class c_int64(_IntType):
TYPE = _c_uint64
TYPE_STRUCT = 'q'
NBYTES = 8
SIGNED = True
def __init__(self, value, endian='little'):
super(c_int64, self).__init__(value, endian)
@property
def date_utc(self):
return datetime.utcfromtimestamp(self.value)
class c_uint64(_IntType):
TYPE = _c_uint64
TYPE_STRUCT = 'Q'
NBYTES = 8
SIGNED = False
def __init__(self, value, endian='little'):
super(c_uint64, self).__init__(value, endian)
@property
def ipv6(self) -> str:
val = socket.inet_ntop(socket.AF_INET6, self.value)
if val.startswith("::ffff:"):
val = val[7:]
return val
@property
def ip(self) -> str:
return self.ipv6
@property
def date_utc(self):
return datetime.utcfromtimestamp(self.value)
class c_byte(_IntType):
TYPE = _c_byte
NBYTES = 1
SIGNED = True
def __init__(self, value, endian='little'):
super(c_byte, self).__init__(value, endian)
class c_bytes(_CType):
def __init__(self, value, endian='little'):
super(c_bytes, self).__init__(value, endian)
class c_ubyte(_IntType):
TYPE = _c_ubyte
NBYTES = 1
SIGNED = False
def __init__(self, value, endian='little'):
super(c_ubyte, self).__init__(value, endian)
class c_string(_CType):
ENCODING = 'ascii'
def __init__(self, value):
super(c_string, self).__init__(value)
if isinstance(value, bytes):
self.NBYTES = len(value)
else:
self.NBYTES = len(self.value.encode(self.ENCODING))
class c_bool(_CType):
TYPE_STRUCT = '?'
NBYTES = 1
def __init__(self, value, endian=None):
if value not in [True, False]:
raise BadArgumentException('bool')
super(c_bool, self).__init__(value)