# SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2020, The Monero Project. # Copyright (c) 2020, dsc@xmr.pm import re from typing import Union from collections import Counter from functools import partial import aiohttp import settings from wowlet_backend.utils import httpget, popularity_contest from wowlet_backend.tasks import WowletTask class BlockheightTask(WowletTask): """ Fetch latest blockheight using webcrawling. We pick the most popular height from a list of websites. Arguably this approach has benefits over querying a (local) Monero RPC instance, as that requires maintenance, while this solution assumes that (at least) 2 websites reports the correct height. """ def __init__(self, interval: int = 60): super(BlockheightTask, self).__init__(interval) self._cache_key = "blockheights" self._cache_expiry = 90 self._websocket_cmd = "blockheights" async def task(self) -> Union[dict, None]: from wowlet_backend.factory import app timeout = aiohttp.ClientTimeout(total=3) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(f'http://{settings.RPC_WOWNERO_HOST}:{settings.RPC_WOWNERO_PORT}/get_height') as resp: blob = await resp.json() return { 'mainnet': blob['height'] }