diff termbanator.py @ 0:f570713a7d31 default tip

start termbanator
author drewp@bigasterisk.com
date Sun, 25 Jun 2023 20:01:38 -0700
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/termbanator.py	Sun Jun 25 20:01:38 2023 -0700
@@ -0,0 +1,250 @@
+import datetime
+import os
+import pdb
+import subprocess
+import sys
+import time
+import logging
+import traceback
+from typing import Any, Optional, Tuple
+
+from javascript import On, Once, console, require
+
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s [%(levelname)s] [line %(lineno)s] %(message)s")
+log = logging.getLogger()
+
+mineflayer = require("mineflayer", "latest")
+Vec3 = require("vec3").Vec3
+registry = require('prismarine-registry')('1.19.3')
+Block = require("prismarine-block")(registry)
+
+host, port, rconPort = ('bang', 25665, 25675)
+
+mfbot = mineflayer.createBot({
+    "host": host,
+    "port": port,
+    "username": 'termbanator',
+    'auth': 'offline',
+    'version': '1.19.3',
+})
+log.info('created')
+
+
+class Player:
+
+    def __init__(self, player):
+        self._p = player
+        if self._p is None:
+            raise TypeError(f'player was {player}')
+        self.username = self._p.username
+
+    def __repr__(self):
+        return f'Player({self.username})'
+
+    def position(self) -> Tuple[float, float, float]:
+        if self._p.entity is None:
+            return getDistantPlayerPos(self.username)
+        pos = self._p.entity.position
+        return (round(pos.x, 2), round(pos.y, 2), round(pos.z, 2))
+
+
+def getDistantPlayerPos(username) -> Tuple[float, float, float]:
+    out = subprocess.check_output([
+        "/home/drewp/Downloads/mcrcon/mcrcon", "-H", host, "-P",
+        str(rconPort), "-p", "111", f"/data get entity @p[name={username}] Pos"
+    ],
+                                  encoding='utf8')
+    words = out.split("the following entity data: ")[1].split('[')[1].split(
+        ']')[0].split(',')
+    return tuple(round(float(v.rstrip('d')), 2) for v in words)
+
+
+class Bot:
+
+    def __init__(self, mineflayer_bot: Any):
+        self._bot = mineflayer_bot
+        self.me = Player(self._bot)
+        mcData = require('minecraft-data')(self._bot.version)
+        self.pf_module = require('mineflayer-pathfinder')
+        self._bot.loadPlugin(self.pf_module.pathfinder)
+
+        self.movements = self.pf_module.Movements(self._bot, mcData)
+        self.movements.allow1by1towers = True
+        self.movements.canDig = False
+        self.movements.scafoldingBlocks.push(
+            self._bot.registry.itemsByName['dirt'].id)
+        self.movements.maxDropDown = 300
+        self.movements.allowFreeMotion = True
+        self.movements.canOpenDoors = True
+
+        self._bot.removeAllListeners('chat')
+
+        self.walkStartTime = 0
+        self.waitUntil = time.time() + 2
+        self.target: Optional[Player] = None
+        self.announcedToTarget = False
+
+    def chat(self, txt):
+        log.info(f'say {txt!r}')
+        self._bot.chat(txt)
+
+    def teleport(self, x, y, z):
+        self._bot.chat(f'/tp {x} {y} {z}')
+
+    def setPathfinderGoal(self, x, y, z, rng):
+        pos = self.me.position()
+        log.info(f'now at {pos}, pathfinding to {x} {y} {z} {rng=}')
+        goal = self.pf_module.goals.GoalNear(x, y, z, rng)
+        self._bot.pathfinder.setMovements(self.movements)
+        self._bot.pathfinder.setGoal(goal)
+        self.walkStartTime = time.time()
+
+    def isMoving(self) -> bool:
+        return self._bot.pathfinder.isMoving()
+
+    def stopPathfinder(self):
+        log.info("Stopping pathfinder")
+        self._bot.pathfinder.stop()
+
+    def ban(self, player: Player, reason: str = "good night"):
+        self.chat(f'/ban {player.username} {reason}')
+
+    def quit(self):
+        self._bot.quit('done')
+        os.kill(os.getpid(), 15)
+
+    ########## below here doesn't use self._bot ##########
+
+    def onLogin(self):
+        self.chat("Beware the termbanator")
+        # self.teleport(-160, 200, 280)
+
+    def walkTo(self, player: Player):
+        self.target = player
+        self.announcedToTarget = False
+        log.info(f'walk to {self.target}')
+        pos = player.position()
+        self.setPathfinderGoal(pos[0], pos[1], pos[2], rng=3)
+
+    def stillWalking(self, walkMinTime=3):
+        if self.walkStartTime and time.time(
+        ) < self.walkStartTime + walkMinTime:
+            return True
+        return self.isMoving()
+
+    def update(self, waitBetweenTargets=9):
+        try:
+            now = time.time()
+
+            if self.waitUntil > 0 and now < self.waitUntil:
+                log.debug(f'waiting {round(self.waitUntil-now, 1)} more sec')
+                return
+            self.waitUntil = 0
+
+            if self.stillWalking():
+                pos = self.me.position()
+                log.info(f'still moving to {self.target}: {pos=}')
+                return
+
+            self.stopPathfinder()
+
+            if self.target:
+                if not self.announcedToTarget:
+                    minsLeft = round(
+                        (self.getTodayEndTime() - time.time()) / 60, 1)
+
+                    if minsLeft > 0:
+                        mins = 'minutes' if minsLeft != 1 else 'minute'
+                        self.chat(
+                            f'hi {self.target.username} - {minsLeft} {mins} left'
+                        )
+                    else:
+                        self.ban(self.target)
+
+                    self.announcedToTarget = True
+                    self.sleepFor(waitBetweenTargets)
+                    return
+
+            log.info('follow next player')
+
+            next = players.after(self.target)
+            try:
+                self.walkTo(next)
+            except ValueError as e:
+                log.warning(repr(e))
+                self.sleepFor(8)
+        except EveryoneGone:
+            bot.quit()
+        except Exception as e:
+            traceback.print_exc()
+            log.warning(repr(e))
+            self.sleepFor(60)
+
+    def getTodayEndTime(self) -> float:
+        current_date = datetime.datetime.now().date()
+        end_datetime = datetime.datetime.combine(current_date,
+                                                 datetime.time(19, 0))
+
+        return end_datetime.timestamp()
+
+    def sleepFor(self, sec):
+        log.info(f'sleeping for {sec}')
+        self.waitUntil = time.time() + sec
+
+
+class EveryoneGone(ValueError):
+    pass
+
+
+class Players:
+
+    def __init__(self, mineflayer_bot: Any):
+        self._b = mineflayer_bot
+
+    def byName(self, name) -> Player:
+        # only works for 'nearby' players- useless
+        return Player(self._b.players[name])
+
+    def after(self, player: Optional[Player]) -> Player:
+        otherPlayers = [p for p in self._b.players if p != 'termbanator']
+        log.info(f'{otherPlayers=}')
+        if not otherPlayers:
+            log.info('everyone is gone')
+            raise EveryoneGone()
+        # import pdb;pdb.set_trace()
+        prevPlayer = player.username if player else None
+
+        try:
+            i = otherPlayers.index(prevPlayer)
+        except (AttributeError, ValueError):
+            i = 0
+        i = (i + 1) % len(otherPlayers)
+
+        return self.byName(otherPlayers[i])
+
+
+bot = Bot(mfbot)
+players = Players(mfbot)
+
+
+@On(mfbot, "login")
+def login(this):
+    bot.onLogin()
+
+
+@On(mfbot, 'chat')
+def handleMsg(this, sender, message, *args):
+    if sender and (sender != 'termbanator'):
+        pass
+
+
+@On(mfbot, 'time')
+def onTime(this):
+    bot.update()
+
+
+@On(mfbot, 'message')
+def message(emitter, msg, *args):
+    log.info(f'got msg event {msg.toString()}')
\ No newline at end of file