keyscan/key.py

75 lines
2.2 KiB
Python

import hashlib
import time
import asyncio
import os
import aiohttp
import bs4
from nio import AsyncClient, MatrixRoom, RoomMessageText
class App:
def __init__(self, user="", password="", room_id=""):
self.user = user
self.password = password
self.room_id = room_id
self.known = set()
async def setup(self):
self.client = AsyncClient("https://doesnt.social", self.user)
await self.client.login(self.password)
await self.send_message("hi there, i'm marvin")
async def send_message(self, message: str):
print(message)
resp = await self.client.room_send(
room_id=self.room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
},
)
print(resp)
async def run(self):
await self.setup()
while True:
await self.update()
time.sleep(500)
async def update(self):
text = ""
async with aiohttp.ClientSession() as session:
url = (
"https://fundsuche02.kivbf.de//MyApp.asp"
"?wci=FundHeader&Mdt=Bamberg&format=&PLZ=96047"
"&KM=50&KATEGORIE=%7B705DA1A1-2E03-4B2A-A31D-4E78E713CD79%7D"
"&DATUM=01.09.2020&BESCHREIBUNG=&FILTER=A&MerkmaleData="
)
async with session.get(url) as resp:
text = await resp.text()
soup = bs4.BeautifulSoup(text, "html.parser")
elements = soup.find_all(
title="Link zum Suchergebnis, Link öffnet neues Fenster"
)
for element in elements:
description = element.text.strip()
digest = hashlib.sha256(description.encode()).hexdigest()
if self.is_new(digest):
await self.send_message(description)
def is_new(self, digest):
if digest in self.known:
return False
self.known.add(digest)
return True
if __name__ == "__main__":
app = App(
user=os.environ["USER"],
password=os.environ["PASSWORD"],
room_id=os.environ["ROOM_ID"],
)
asyncio.get_event_loop().run_until_complete(app.run())