Merge pull request #8 from python-monero/master

Added stat_info_request
master
xmrdsc 4 years ago committed by GitHub
commit affe64721c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

1
.gitignore vendored

@ -6,3 +6,4 @@ htmlcov
.idea
.autoenv
.venv
.vscode

@ -1,3 +1,4 @@
import logging
import sys
import struct
import socket
@ -8,6 +9,7 @@ from levin.constants import *
from levin.exceptions import BadArgumentException
from levin.ctypes import *
log = logging.getLogger()
class Bucket:
def __init__(self):
@ -54,9 +56,11 @@ class Bucket:
bucket.protocol_version = LEVIN_PROTOCOL_VER_1
@staticmethod
def create_handshake_request(my_port: int = 0, network_id: bytes = None,
peer_id: bytes = b'\x41\x41\x41\x41\x41\x41\x41\x41',
verbose=False):
def create_handshake_request(
my_port: int = 0,
network_id: bytes = None,
peer_id: bytes = b'\x41\x41\x41\x41\x41\x41\x41\x41',
):
"""
Helper function to create a handshake request. Does not require
parameters but you can use them to impersonate a legit node.
@ -67,17 +71,29 @@ class Bucket:
:return:
"""
handshake_section = Section.handshake_request(peer_id=peer_id, network_id=network_id, my_port=my_port)
bucket = Bucket.create_request(1001, section=handshake_section)
bucket = Bucket.create_request(P2P_COMMAND_HANDSHAKE.value, section=handshake_section)
if verbose:
print(">> created packet \'%s\'" % P2P_COMMANDS[bucket.command])
log.debug(">> created packet '%s'" % P2P_COMMANDS[bucket.command])
header = bucket.header()
body = bucket.payload()
return bucket
@staticmethod
def create_stat_info_request(
peer_id: bytes = b"\x41\x41\x41\x41\x41\x41\x41\x41",
):
stat_info_section = Section.stat_info_request(peer_id=peer_id)
log.debug(stat_info_section.entries["proof_of_trust"].entries.keys())
bucket = Bucket.create_request(
P2P_COMMAND_REQUEST_STAT_INFO.value, section=stat_info_section
)
log.debug(">> created packet '%s'" % P2P_COMMANDS[bucket.command])
return bucket
@classmethod
def from_buffer(cls, signature: c_uint64, sock: socket.socket, verbose: bool = False):
def from_buffer(cls, signature: c_uint64, sock: socket.socket):
if isinstance(signature, bytes):
signature = c_uint64(signature)
# if isinstance(buffer, bytes):
@ -111,14 +127,12 @@ class Bucket:
bucket.payload_section = None
if verbose:
print("<< received packet \'%s\'" % P2P_COMMANDS[bucket.command])
log.debug("<< received packet '%s'" % P2P_COMMANDS[bucket.command])
from levin.reader import LevinReader
bucket.payload_section = LevinReader(BytesIO(bucket.payload)).read_payload()
if verbose:
print("<< parsed packet \'%s\'" % P2P_COMMANDS[bucket.command])
log.debug("<< parsed packet '%s'" % P2P_COMMANDS[bucket.command])
return bucket
def header(self):

@ -20,12 +20,18 @@ P2P_COMMANDS_POOL_BASE = c_uint32(1000)
P2P_COMMAND_HANDSHAKE = c_uint32(P2P_COMMANDS_POOL_BASE + 1) # carries payload
P2P_COMMAND_TIMED_SYNC = c_uint32(P2P_COMMANDS_POOL_BASE + 2) # carries payload
P2P_COMMAND_PING = c_uint32(P2P_COMMANDS_POOL_BASE + 3)
P2P_COMMAND_REQUEST_STAT_INFO = c_uint32(P2P_COMMANDS_POOL_BASE + 4)
P2P_COMMAND_REQUEST_NETWORK_STATE = c_uint32(P2P_COMMANDS_POOL_BASE + 5)
P2P_COMMAND_REQUEST_PEER_ID = c_uint32(P2P_COMMANDS_POOL_BASE + 6)
P2P_COMMAND_REQUEST_SUPPORT_FLAGS = c_uint32(P2P_COMMANDS_POOL_BASE + 7)
P2P_COMMANDS = {
P2P_COMMAND_HANDSHAKE: 'handshake',
P2P_COMMAND_TIMED_SYNC: 'timed_sync',
P2P_COMMAND_PING: 'ping',
P2P_COMMAND_REQUEST_SUPPORT_FLAGS: 'support_flags'
P2P_COMMAND_REQUEST_SUPPORT_FLAGS: 'support_flags',
P2P_COMMAND_REQUEST_STAT_INFO: 'stat_info',
P2P_COMMAND_REQUEST_NETWORK_STATE: 'network_state',
P2P_COMMAND_REQUEST_PEER_ID: 'peer_id',
}
PORTABLE_STORAGE_SIGNATUREA = c_uint32(0x01011101)

@ -4,10 +4,19 @@ import ipaddress
from io import BytesIO
from datetime import datetime
from ctypes import (
c_ushort as _c_ushort, c_ubyte as _c_ubyte, c_uint8 as _c_uint8,
c_uint16 as _c_uint16, c_uint32 as _c_uint32, c_uint64 as _c_uint64,
c_short as _c_short, c_byte as _c_byte, c_int8 as _c_int8, c_int16 as _c_int16,
c_int32 as _c_int32, c_int64 as _c_int64)
c_ushort as _c_ushort,
c_ubyte as _c_ubyte,
c_uint8 as _c_uint8,
c_uint16 as _c_uint16,
c_uint32 as _c_uint32,
c_uint64 as _c_uint64,
c_short as _c_short,
c_byte as _c_byte,
c_int8 as _c_int8,
c_int16 as _c_int16,
c_int32 as _c_int32,
c_int64 as _c_int64,
)
from levin.exceptions import BadArgumentException

@ -55,6 +55,25 @@ class Section:
section.add("support_flags", P2P_SUPPORT_FLAGS)
return section
@classmethod
def stat_info_request(cls, peer_id: bytes = None):
if not peer_id:
peer_id = random.getrandbits(64)
signature = bytes.fromhex(
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3"
)
section = cls()
proof_of_trust = Section()
proof_of_trust.add("peer_id", c_uint64(peer_id))
proof_of_trust.add("time", c_uint64(int(time())))
proof_of_trust.add("sign", c_string(signature))
section.add("proof_of_trust", proof_of_trust)
return section
def __bytes__(self):
from levin.writer import LevinWriter

@ -7,7 +7,7 @@ from io import BytesIO
def ip2int(addr):
return struct.unpack("!I", socket.inetaton(addr))[0]
return struct.unpack("!I", socket.inet_aton(addr))[0]
def int2ip(addr):

Loading…
Cancel
Save