You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
320 lines
8.3 KiB
320 lines
8.3 KiB
import socket
|
|
import struct
|
|
import ipaddress
|
|
from io import BytesIO
|
|
from datetime import datetime
|
|
from ctypes import (
|
|
c_ushort as _c_ushort,
|
|
c_ubyte as _c_ubyte,
|
|
c_uint8 as _c_uint8,
|
|
c_uint16 as _c_uint16,
|
|
c_uint32 as _c_uint32,
|
|
c_uint64 as _c_uint64,
|
|
c_short as _c_short,
|
|
c_byte as _c_byte,
|
|
c_int8 as _c_int8,
|
|
c_int16 as _c_int16,
|
|
c_int32 as _c_int32,
|
|
c_int64 as _c_int64,
|
|
)
|
|
|
|
from levin.exceptions import BadArgumentException
|
|
|
|
|
|
class _CType:
|
|
def __init__(self, value, endian=None):
|
|
self.value = value
|
|
self.endian = endian
|
|
|
|
@classmethod
|
|
def from_buffer(cls, buffer, endian: str = 'little', padding: bytes = None):
|
|
if isinstance(buffer, BytesIO):
|
|
buffer = buffer.read(cls.NBYTES)
|
|
elif isinstance(buffer, socket.socket):
|
|
buffer = buffer.recv(cls.NBYTES)
|
|
|
|
if len(buffer) < cls.NBYTES:
|
|
if not padding:
|
|
_bytes = 'bytes' if cls.NBYTES > 1 else 'byte'
|
|
raise BadArgumentException("requires a buffer of %d %s, received %d" % (cls.NBYTES, _bytes, len(buffer)))
|
|
|
|
func_padding = bytes.ljust if endian == 'little' else bytes.rjust
|
|
buffer = func_padding(buffer, cls.NBYTES, padding)
|
|
|
|
_endian = '<' if endian == 'little' else '>'
|
|
type_struct = cls.TYPE_STRUCT if hasattr(cls, 'TYPE_STRUCT') else cls.TYPE._type_
|
|
value = struct.unpack('%s%s' % (_endian, type_struct), buffer)[0]
|
|
return cls(value, endian)
|
|
|
|
def to_bytes(self):
|
|
if isinstance(self.value, (int, bool)):
|
|
type_struct = self.TYPE_STRUCT if hasattr(self, 'TYPE_STRUCT') else self.TYPE._type_
|
|
return struct.pack('%s%s' % ('<' if self.endian == 'little' else '>', type_struct), self.value)
|
|
elif isinstance(self.value, bytes):
|
|
if self.endian == 'little':
|
|
return self.value[::-1]
|
|
return self.value
|
|
|
|
def _overflows(self):
|
|
if isinstance(self.value, int):
|
|
try:
|
|
self.value.to_bytes(self.NBYTES, byteorder=self.endian, signed=self.SIGNED)
|
|
except OverflowError as e:
|
|
raise OverflowError("Value \'%d\' does not fit in %s." % (self.value, self.__class__.__name__))
|
|
|
|
def __len__(self):
|
|
if isinstance(self.value, bytes):
|
|
return len(self.value)
|
|
if isinstance(self.value, int):
|
|
return len(bytes(self))
|
|
|
|
return len(bytes(self.value))
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(self.value, int):
|
|
return self.value == other
|
|
|
|
def __ne__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
return self.value != other
|
|
|
|
def __and__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
# Bitwise AND
|
|
return self.value & other
|
|
raise NotImplementedError()
|
|
|
|
def __lshift__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
# Bitwise Left Shift
|
|
return self.value << other
|
|
raise NotImplementedError()
|
|
|
|
def __rshift__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
# Bitwise Right Shift
|
|
return self.value >> other
|
|
raise NotImplementedError()
|
|
|
|
def __rlshift__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
# Bitwise Right Shift
|
|
return self.value << other
|
|
raise NotImplementedError()
|
|
|
|
def __rrshift__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
# Bitwise Left Shift
|
|
return self.value >> other
|
|
raise NotImplementedError()
|
|
|
|
def __or__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
# Bitwise OR
|
|
return self.value | other
|
|
raise NotImplementedError()
|
|
|
|
def __mod__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
return self.value % other
|
|
|
|
def __add__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
return self.value + other
|
|
|
|
def __radd__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
return self.value + other
|
|
|
|
def __gt__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
return self.value > other
|
|
|
|
def __lt__(self, other):
|
|
if isinstance(other, _CType):
|
|
other = other.value
|
|
|
|
if isinstance(self.value, int):
|
|
return self.value < other
|
|
|
|
def __bytes__(self):
|
|
return self.to_bytes()
|
|
|
|
def __hash__(self):
|
|
return self.value
|
|
|
|
def __int__(self):
|
|
return self.value
|
|
|
|
|
|
class _IntType(_CType):
|
|
def __init__(self, value, endian='little'):
|
|
super(_IntType, self).__init__(value, endian)
|
|
self._overflows()
|
|
|
|
def __repr__(self):
|
|
_signed = 'unsigned' if not self.SIGNED else 'signed'
|
|
_bytes = 'bytes' if self.NBYTES > 1 else 'byte'
|
|
_cls_name = self.__class__.__name__
|
|
return '\'%d\' - %s - %s %d %s' % (self.value, _cls_name, _signed, self.NBYTES, _bytes)
|
|
|
|
|
|
class c_int16(_IntType):
|
|
TYPE = _c_short
|
|
NBYTES = 2
|
|
SIGNED = True
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_int16, self).__init__(value, endian)
|
|
|
|
|
|
class c_uint16(_IntType):
|
|
TYPE = _c_ushort
|
|
NBYTES = 2
|
|
SIGNED = False
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_uint16, self).__init__(value, endian)
|
|
|
|
|
|
class c_int32(_IntType):
|
|
TYPE = _c_uint32
|
|
NBYTES = 4
|
|
SIGNED = True
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_int32, self).__init__(value, endian)
|
|
|
|
|
|
class c_uint32(_IntType):
|
|
TYPE = _c_uint32
|
|
NBYTES = 4
|
|
SIGNED = False
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_uint32, self).__init__(value, endian)
|
|
|
|
@property
|
|
def ipv4(self) -> str:
|
|
return str(ipaddress.IPv4Address(self.value))
|
|
|
|
@property
|
|
def ip(self) -> str:
|
|
return self.ipv4
|
|
|
|
|
|
class c_int64(_IntType):
|
|
TYPE = _c_uint64
|
|
TYPE_STRUCT = 'q'
|
|
NBYTES = 8
|
|
SIGNED = True
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_int64, self).__init__(value, endian)
|
|
|
|
@property
|
|
def date_utc(self):
|
|
return datetime.utcfromtimestamp(self.value)
|
|
|
|
|
|
class c_uint64(_IntType):
|
|
TYPE = _c_uint64
|
|
TYPE_STRUCT = 'Q'
|
|
NBYTES = 8
|
|
SIGNED = False
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_uint64, self).__init__(value, endian)
|
|
|
|
@property
|
|
def ipv6(self) -> str:
|
|
val = socket.inet_ntop(socket.AF_INET6, self.value)
|
|
if val.startswith("::ffff:"):
|
|
val = val[7:]
|
|
return val
|
|
|
|
@property
|
|
def ip(self) -> str:
|
|
return self.ipv6
|
|
|
|
@property
|
|
def date_utc(self):
|
|
return datetime.utcfromtimestamp(self.value)
|
|
|
|
|
|
class c_byte(_IntType):
|
|
TYPE = _c_byte
|
|
NBYTES = 1
|
|
SIGNED = True
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_byte, self).__init__(value, endian)
|
|
|
|
|
|
class c_bytes(_CType):
|
|
def __init__(self, value, endian='little'):
|
|
super(c_bytes, self).__init__(value, endian)
|
|
|
|
|
|
class c_ubyte(_IntType):
|
|
TYPE = _c_ubyte
|
|
NBYTES = 1
|
|
SIGNED = False
|
|
|
|
def __init__(self, value, endian='little'):
|
|
super(c_ubyte, self).__init__(value, endian)
|
|
|
|
|
|
class c_string(_CType):
|
|
ENCODING = 'ascii'
|
|
|
|
def __init__(self, value):
|
|
super(c_string, self).__init__(value)
|
|
if isinstance(value, bytes):
|
|
self.NBYTES = len(value)
|
|
else:
|
|
self.NBYTES = len(self.value.encode(self.ENCODING))
|
|
|
|
|
|
class c_bool(_CType):
|
|
TYPE_STRUCT = '?'
|
|
NBYTES = 1
|
|
|
|
def __init__(self, value, endian=None):
|
|
if value not in [True, False]:
|
|
raise BadArgumentException('bool')
|
|
super(c_bool, self).__init__(value)
|