commit
db857d981c
@ -0,0 +1,3 @@
|
||||
.idea/*
|
||||
*.pyc
|
||||
__pycache__
|
@ -0,0 +1,125 @@
|
||||
# WOWlet WS Client
|
||||
|
||||
Sample Python asyncio websocket client for communicating with
|
||||
wowlet running in "background mode" via websockets.
|
||||
|
||||
This library was made in a few hours, so it's not complete, and should
|
||||
serve as an example and PoC.
|
||||
|
||||
## Example
|
||||
|
||||
First, start [wowlet](https://git.wownero.com/wowlet/wowlet/):
|
||||
|
||||
./wowlet --background 127.0.0.1:42069
|
||||
|
||||
Then run the following script, it will:
|
||||
|
||||
1. Open a wallet
|
||||
2. Request a receiving address listing
|
||||
3. Run forever
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
from wowlet_ws_client.client import WowletWSClient
|
||||
|
||||
|
||||
wowlet = WowletWSClient(host="127.0.0.1", port=42069, debug=True)
|
||||
|
||||
|
||||
@wowlet.event("walletList")
|
||||
async def on_wallet_list(data):
|
||||
print("wallet listing received")
|
||||
print(data)
|
||||
|
||||
|
||||
@wowlet.event("addressList")
|
||||
async def on_address_list(data):
|
||||
print("address listing received")
|
||||
print(data)
|
||||
|
||||
|
||||
@wowlet.event("transactionHistory")
|
||||
async def on_transaction_history(data):
|
||||
print(data)
|
||||
|
||||
|
||||
@wowlet.event("addressBook")
|
||||
async def on_address_book(data):
|
||||
print(data)
|
||||
|
||||
|
||||
@wowlet.event("synchronized")
|
||||
async def on_synchronized(data):
|
||||
# called when the wallet is synchronized with the network
|
||||
print(data)
|
||||
|
||||
|
||||
@wowlet.event("transactionCommitted")
|
||||
async def on_transaction_commited(data):
|
||||
# when a transaction was sent
|
||||
print(data)
|
||||
|
||||
|
||||
async def main():
|
||||
await wowlet.connect()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
await wowlet.open_wallet("/home/user/Wownero/wallets/wstest.keys", "test")
|
||||
|
||||
await asyncio.sleep(3)
|
||||
await wowlet.address_list(0, 0)
|
||||
|
||||
# hack to keep the script running
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
```
|
||||
|
||||
## Creating a wallet
|
||||
|
||||
Default directory = wowlet's default wallet directory
|
||||
|
||||
```python
|
||||
@wowlet.event("walletCreated")
|
||||
async def on_wallet_created(data):
|
||||
print(data)
|
||||
|
||||
@wowlet.event("walletCreatedError")
|
||||
async def on_wallet_created_error(data):
|
||||
print("error")
|
||||
print(data)
|
||||
|
||||
await wowlet.create_wallet("cool_wallet", password="sekrit")
|
||||
```
|
||||
|
||||
## Creating a transaction
|
||||
|
||||
```python
|
||||
address = "WW2cyFaAu4NSpZdpEP3egWcK99QqU8afJEpfZW1HNxofB9sH86Nir5pes7ofwAE3ZWVNdgxFrWeSiSiSLumSeZ7538zCXb6gp"
|
||||
await wowlet.send_transaction(
|
||||
address=address,
|
||||
amount=2,
|
||||
description="test transaction from Python")
|
||||
```
|
||||
|
||||
## QR
|
||||
|
||||
This library can also generate QR codes because why not.
|
||||
|
||||
```python
|
||||
import aiofiles
|
||||
from wowlet_ws_client.qr import qr_png
|
||||
|
||||
addr = "some_address_here"
|
||||
qr_image = qr_png(addr)
|
||||
|
||||
async with aiofiles.open('output.png', mode="wb") as f:
|
||||
await f.write(qr_image.getbuffer())
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
BSD
|
@ -0,0 +1,5 @@
|
||||
aiohttp
|
||||
aiofiles
|
||||
pyqrcode
|
||||
pypng
|
||||
pillow-simd
|
@ -0,0 +1,10 @@
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
# Copyright (c) 2021, The Wownero Project.
|
||||
|
||||
COIN_ADDRESS_LENGTH = [97, 108]
|
||||
COINCODE = 'WOW'
|
||||
|
||||
# QR image related
|
||||
PIL_IMAGE_DIMENSIONS = (512, 512)
|
||||
PIL_QUALITY = 50
|
||||
PIL_OPTIMIZE = True
|
@ -0,0 +1,44 @@
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
# Copyright (c) 2021, The Wownero Project.
|
||||
|
||||
"""
|
||||
wowlet-ws-client
|
||||
-------------
|
||||
Sample websocket client for communicating with
|
||||
wowlet running in "background mode" via websockets.
|
||||
"""
|
||||
from setuptools import setup
|
||||
|
||||
with open('README.md') as f:
|
||||
long_description = f.read()
|
||||
|
||||
|
||||
INSTALL_REQUIRES = open("requirements.txt").read().splitlines()
|
||||
|
||||
setup(
|
||||
name='wowlet_ws_client',
|
||||
version='0.0.1',
|
||||
url='https://git.wownero.com/wownero/wowlet-ws-client',
|
||||
author='dsc',
|
||||
author_email='dsc@xmr.pm',
|
||||
description='Sample websocket client for communicating with wowlet running in "background mode" via websockets.',
|
||||
long_description=long_description,
|
||||
long_description_content_type='text/markdown',
|
||||
packages=['wowlet_ws_client'],
|
||||
zip_safe=False,
|
||||
include_package_data=True,
|
||||
platforms='any',
|
||||
install_requires=INSTALL_REQUIRES,
|
||||
tests_require=INSTALL_REQUIRES + ["asynctest", "hypothesis", "pytest", "pytest-asyncio"],
|
||||
extras_require={"dotenv": ["python-dotenv"]},
|
||||
classifiers=[
|
||||
'Environment :: Web Environment',
|
||||
'Intended Audience :: Developers',
|
||||
'Operating System :: OS Independent',
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules'
|
||||
]
|
||||
)
|
@ -0,0 +1,24 @@
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
# Copyright (c) 2021, The Wownero Project.
|
||||
|
||||
from typing import Dict
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class WowletState(Enum):
|
||||
IDLE = ["walletList", "walletClosed", "walletOpenedError", "walletCreatedError"]
|
||||
OPENED = ["walletOpened"]
|
||||
BOOTSTRAP = ["refreshSync"]
|
||||
SYNCHRONIZED = ["synchronized", "transactionError"]
|
||||
CREATING_TX = ["transactionStarted"]
|
||||
|
||||
|
||||
WALLET_OPERATIONAL = [WowletState.OPENED, WowletState.BOOTSTRAP, WowletState.SYNCHRONIZED]
|
||||
|
||||
|
||||
def decorator_parametrized(dec):
|
||||
def layer(*args, **kwargs):
|
||||
def repl(view_func):
|
||||
return dec(args[0], view_func, *args[1:], **kwargs)
|
||||
return repl
|
||||
return layer
|
@ -0,0 +1,188 @@
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
# Copyright (c) 2021, The Wownero Project.
|
||||
|
||||
import logging
|
||||
from typing import Awaitable, Callable, Dict, Optional, Union, List
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import aiohttp
|
||||
|
||||
from wowlet_ws_client import WowletState, decorator_parametrized, WALLET_OPERATIONAL
|
||||
|
||||
|
||||
class WowletWSClient:
|
||||
def __init__(self, host: str = "127.0.0.1", port: int = 42069, debug: bool = False):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ses = aiohttp.ClientSession()
|
||||
self.debug = debug
|
||||
|
||||
self._ws = None
|
||||
|
||||
self._state = WowletState.IDLE
|
||||
|
||||
self._cmd_callbacks: Dict[str, Callable] = {}
|
||||
|
||||
self._loop_task: Callable = None
|
||||
|
||||
self.wallets: List[str] = []
|
||||
self.addresses: List[str] = []
|
||||
self.address_book: List[Dict[str: str]] = []
|
||||
self.unlocked_balance: float
|
||||
self.balance: float
|
||||
|
||||
async def open_wallet(self, path: str, password: str = ""):
|
||||
"""
|
||||
Opens a wallet
|
||||
:param path: absolute path to wallet file
|
||||
:param password: wallet password
|
||||
:return:
|
||||
"""
|
||||
if self._state != WowletState.IDLE:
|
||||
raise Exception("You may only open wallets in IDLE "
|
||||
"state. Close the current wallet first.")
|
||||
await self.sendMessage("openWallet", {
|
||||
"path": path,
|
||||
"password": password
|
||||
})
|
||||
|
||||
async def close_wallet(self):
|
||||
"""Close currently opened wallet"""
|
||||
if self._state not in WALLET_OPERATIONAL:
|
||||
raise Exception("There is no opened wallet to close.")
|
||||
await self.sendMessage("closeWallet", {})
|
||||
|
||||
async def address_list(self, account_index: int = 0, address_index: int = 0, limit: int = 50, offset: int = 0):
|
||||
if self._state not in WALLET_OPERATIONAL:
|
||||
raise Exception("There is no opened wallet to close.")
|
||||
await self.sendMessage("addressList", {
|
||||
"accountIndex": account_index,
|
||||
"addressIndex": address_index,
|
||||
"limit": limit,
|
||||
"offset": offset
|
||||
})
|
||||
|
||||
async def send_transaction(self, address: str, amount: float, description: str = "", _all: bool = False):
|
||||
"""
|
||||
Send someone some monies.
|
||||
:param address: address
|
||||
:param amount: amount
|
||||
:param description: optional description
|
||||
:param _all: send all available funds (use with caution)
|
||||
:return:
|
||||
"""
|
||||
if amount <= 0:
|
||||
raise Exception("y u send 0")
|
||||
if self._state not in WALLET_OPERATIONAL:
|
||||
raise Exception("There is no opened wallet to close.")
|
||||
await self.sendMessage("sendTransaction", {
|
||||
"address": address,
|
||||
"description": description,
|
||||
"amount": amount,
|
||||
"all": _all
|
||||
})
|
||||
|
||||
async def create_wallet(self, name: str, password: str = "", path: str = None):
|
||||
"""
|
||||
Automatically create a new wallet. Comes up with it's own
|
||||
mnemonic seed and stuffz.
|
||||
:param name: name of the new wallet
|
||||
:param password: (optional) password for the wallet
|
||||
:param path: (optional) absolute path *to a directory*, default is
|
||||
Wownero wallet directory.
|
||||
:return:
|
||||
"""
|
||||
if self._state != WowletState.IDLE:
|
||||
raise Exception("Please close the currently opened wallet first.")
|
||||
|
||||
await self.sendMessage("createWallet", {
|
||||
"name": name,
|
||||
"path": path,
|
||||
"password": password
|
||||
})
|
||||
|
||||
async def get_transaction_history(self):
|
||||
"""Get transaction history in its entirety."""
|
||||
# if self._state != WALLET_OPERATIONAL:
|
||||
# raise Exception("The wallet is currently busy doing something else.")
|
||||
await self.sendMessage("transactionHistory", {})
|
||||
|
||||
async def get_address_book(self):
|
||||
"""Get all address book entries."""
|
||||
# if self._state != WALLET_OPERATIONAL:
|
||||
# raise Exception("The wallet is currently busy doing something else.")
|
||||
await self.sendMessage("addressBook", {})
|
||||
|
||||
async def address_book_insert(self, name: str, address: str):
|
||||
if not name or not address:
|
||||
raise Exception("name or address empty")
|
||||
await self.sendMessage("addressBookItemAdd", {
|
||||
"name": name,
|
||||
"address": address
|
||||
})
|
||||
|
||||
async def address_book_remove(self, address: str):
|
||||
"""Remove by address"""
|
||||
if not address:
|
||||
raise Exception("address empty")
|
||||
await self.sendMessage("addressBookItemRemove", {
|
||||
"address": address
|
||||
})
|
||||
|
||||
async def create_wallet_advanced(self):
|
||||
pass
|
||||
|
||||
async def loop(self):
|
||||
while True:
|
||||
buffer = await self._ws.receive()
|
||||
msg = json.loads(buffer.data)
|
||||
|
||||
if "cmd" not in msg:
|
||||
logging.error("invalid message received")
|
||||
continue
|
||||
|
||||
cmd = msg['cmd']
|
||||
data = msg.get("data")
|
||||
if self.debug:
|
||||
print(msg['cmd'])
|
||||
|
||||
for k, v in WowletState.__members__.items():
|
||||
state = WowletState[k]
|
||||
if cmd in v.value and self._state != state:
|
||||
if self.debug:
|
||||
print(f"Changing state to {k}")
|
||||
self._state = state
|
||||
|
||||
if cmd == "walletList":
|
||||
self.wallets = data
|
||||
elif cmd == "addressBook":
|
||||
self.address_book = data
|
||||
elif cmd == "addressList":
|
||||
if data.get("accountIndex", 0) == 0 and \
|
||||
data.get("addressIndex") == 0 and \
|
||||
data.get("offset") == 0:
|
||||
self.addresses = data
|
||||
elif cmd == "balanceUpdated":
|
||||
self.balance = data.get('balance', 0)
|
||||
self.balance_unlocked = data.get('spendable', 0)
|
||||
|
||||
if cmd in self._cmd_callbacks:
|
||||
await self._cmd_callbacks[cmd](data)
|
||||
|
||||
@decorator_parametrized
|
||||
def event(self, view_func, cmd_name):
|
||||
self._cmd_callbacks[cmd_name] = view_func
|
||||
|
||||
async def connect(self):
|
||||
self._ws = await self.ses.ws_connect(f"ws://{self.host}:{self.port}")
|
||||
self._loop_task = asyncio.create_task(self.loop())
|
||||
|
||||
async def send(self, data: bytes) -> None:
|
||||
return await self._ws.send_bytes(data)
|
||||
|
||||
async def sendMessage(self, command: str, data: dict) -> None:
|
||||
return await self.send(json.dumps({
|
||||
"cmd": command,
|
||||
"data": data
|
||||
}).encode())
|
@ -0,0 +1,66 @@
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
# Copyright (c) 2021, The Wownero Project.
|
||||
|
||||
import os
|
||||
from io import BytesIO
|
||||
|
||||
import pyqrcode
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
import settings
|
||||
|
||||
|
||||
def qr_png(
|
||||
address: str,
|
||||
size=settings.PIL_IMAGE_DIMENSIONS,
|
||||
color_from=(210, 83, 200),
|
||||
color_to=(255, 169, 62)
|
||||
) -> BytesIO:
|
||||
"""Pretty QR c0dez j00"""
|
||||
created = pyqrcode.create(address, error='L')
|
||||
buffer = BytesIO()
|
||||
result = BytesIO()
|
||||
created.png(buffer, scale=14, quiet_zone=2)
|
||||
|
||||
im = Image.open(buffer)
|
||||
im = im.convert("RGBA")
|
||||
im.thumbnail(size)
|
||||
|
||||
im_data = im.getdata()
|
||||
|
||||
# make black color transparent
|
||||
im_transparent = []
|
||||
for color_point in im_data:
|
||||
if sum(color_point[:3]) == 255 * 3:
|
||||
im_transparent.append(color_point)
|
||||
else:
|
||||
# get rid of the subtle grey borders
|
||||
alpha = 0 if color_from and color_to else 1
|
||||
im_transparent.append((0, 0, 0, alpha))
|
||||
continue
|
||||
|
||||
if not color_from and not color_to:
|
||||
im.save(result, format="PNG", quality=settings.PIL_QUALITY, optimize=settings.PIL_OPTIMIZE)
|
||||
result.seek(0)
|
||||
return result
|
||||
|
||||
# turn QR into a gradient
|
||||
im.putdata(im_transparent)
|
||||
|
||||
gradient = Image.new('RGBA', im.size, color=0)
|
||||
draw = ImageDraw.Draw(gradient)
|
||||
|
||||
for i, color in enumerate(gradient_interpolate(color_from, color_to, im.width * 2)):
|
||||
draw.line([(i, 0), (0, i)], tuple(color), width=1)
|
||||
|
||||
im_gradient = Image.alpha_composite(gradient, im)
|
||||
im_gradient.save(result, format="PNG", quality=settings.PIL_QUALITY, optimize=settings.PIL_OPTIMIZE)
|
||||
|
||||
result.seek(0)
|
||||
return result
|
||||
|
||||
|
||||
def gradient_interpolate(color_from, color_to, interval):
|
||||
det_co = [(t - f) / interval for f, t in zip(color_from, color_to)]
|
||||
for i in range(interval):
|
||||
yield [round(f + det * i) for f, det in zip(color_from, det_co)]
|
Loading…
Reference in new issue