view termbanator.py @ 0:f570713a7d31 default tip

start termbanator
author drewp@bigasterisk.com
date Sun, 25 Jun 2023 20:01:38 -0700
parents
children
line wrap: on
line source

import datetime
import os
import pdb
import subprocess
import sys
import time
import logging
import traceback
from typing import Any, Optional, Tuple

from javascript import On, Once, console, require

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] [line %(lineno)s] %(message)s")
log = logging.getLogger()

mineflayer = require("mineflayer", "latest")
Vec3 = require("vec3").Vec3
registry = require('prismarine-registry')('1.19.3')
Block = require("prismarine-block")(registry)

host, port, rconPort = ('bang', 25665, 25675)

mfbot = mineflayer.createBot({
    "host": host,
    "port": port,
    "username": 'termbanator',
    'auth': 'offline',
    'version': '1.19.3',
})
log.info('created')


class Player:

    def __init__(self, player):
        self._p = player
        if self._p is None:
            raise TypeError(f'player was {player}')
        self.username = self._p.username

    def __repr__(self):
        return f'Player({self.username})'

    def position(self) -> Tuple[float, float, float]:
        if self._p.entity is None:
            return getDistantPlayerPos(self.username)
        pos = self._p.entity.position
        return (round(pos.x, 2), round(pos.y, 2), round(pos.z, 2))


def getDistantPlayerPos(username) -> Tuple[float, float, float]:
    out = subprocess.check_output([
        "/home/drewp/Downloads/mcrcon/mcrcon", "-H", host, "-P",
        str(rconPort), "-p", "111", f"/data get entity @p[name={username}] Pos"
    ],
                                  encoding='utf8')
    words = out.split("the following entity data: ")[1].split('[')[1].split(
        ']')[0].split(',')
    return tuple(round(float(v.rstrip('d')), 2) for v in words)


class Bot:

    def __init__(self, mineflayer_bot: Any):
        self._bot = mineflayer_bot
        self.me = Player(self._bot)
        mcData = require('minecraft-data')(self._bot.version)
        self.pf_module = require('mineflayer-pathfinder')
        self._bot.loadPlugin(self.pf_module.pathfinder)

        self.movements = self.pf_module.Movements(self._bot, mcData)
        self.movements.allow1by1towers = True
        self.movements.canDig = False
        self.movements.scafoldingBlocks.push(
            self._bot.registry.itemsByName['dirt'].id)
        self.movements.maxDropDown = 300
        self.movements.allowFreeMotion = True
        self.movements.canOpenDoors = True

        self._bot.removeAllListeners('chat')

        self.walkStartTime = 0
        self.waitUntil = time.time() + 2
        self.target: Optional[Player] = None
        self.announcedToTarget = False

    def chat(self, txt):
        log.info(f'say {txt!r}')
        self._bot.chat(txt)

    def teleport(self, x, y, z):
        self._bot.chat(f'/tp {x} {y} {z}')

    def setPathfinderGoal(self, x, y, z, rng):
        pos = self.me.position()
        log.info(f'now at {pos}, pathfinding to {x} {y} {z} {rng=}')
        goal = self.pf_module.goals.GoalNear(x, y, z, rng)
        self._bot.pathfinder.setMovements(self.movements)
        self._bot.pathfinder.setGoal(goal)
        self.walkStartTime = time.time()

    def isMoving(self) -> bool:
        return self._bot.pathfinder.isMoving()

    def stopPathfinder(self):
        log.info("Stopping pathfinder")
        self._bot.pathfinder.stop()

    def ban(self, player: Player, reason: str = "good night"):
        self.chat(f'/ban {player.username} {reason}')

    def quit(self):
        self._bot.quit('done')
        os.kill(os.getpid(), 15)

    ########## below here doesn't use self._bot ##########

    def onLogin(self):
        self.chat("Beware the termbanator")
        # self.teleport(-160, 200, 280)

    def walkTo(self, player: Player):
        self.target = player
        self.announcedToTarget = False
        log.info(f'walk to {self.target}')
        pos = player.position()
        self.setPathfinderGoal(pos[0], pos[1], pos[2], rng=3)

    def stillWalking(self, walkMinTime=3):
        if self.walkStartTime and time.time(
        ) < self.walkStartTime + walkMinTime:
            return True
        return self.isMoving()

    def update(self, waitBetweenTargets=9):
        try:
            now = time.time()

            if self.waitUntil > 0 and now < self.waitUntil:
                log.debug(f'waiting {round(self.waitUntil-now, 1)} more sec')
                return
            self.waitUntil = 0

            if self.stillWalking():
                pos = self.me.position()
                log.info(f'still moving to {self.target}: {pos=}')
                return

            self.stopPathfinder()

            if self.target:
                if not self.announcedToTarget:
                    minsLeft = round(
                        (self.getTodayEndTime() - time.time()) / 60, 1)

                    if minsLeft > 0:
                        mins = 'minutes' if minsLeft != 1 else 'minute'
                        self.chat(
                            f'hi {self.target.username} - {minsLeft} {mins} left'
                        )
                    else:
                        self.ban(self.target)

                    self.announcedToTarget = True
                    self.sleepFor(waitBetweenTargets)
                    return

            log.info('follow next player')

            next = players.after(self.target)
            try:
                self.walkTo(next)
            except ValueError as e:
                log.warning(repr(e))
                self.sleepFor(8)
        except EveryoneGone:
            bot.quit()
        except Exception as e:
            traceback.print_exc()
            log.warning(repr(e))
            self.sleepFor(60)

    def getTodayEndTime(self) -> float:
        current_date = datetime.datetime.now().date()
        end_datetime = datetime.datetime.combine(current_date,
                                                 datetime.time(19, 0))

        return end_datetime.timestamp()

    def sleepFor(self, sec):
        log.info(f'sleeping for {sec}')
        self.waitUntil = time.time() + sec


class EveryoneGone(ValueError):
    pass


class Players:

    def __init__(self, mineflayer_bot: Any):
        self._b = mineflayer_bot

    def byName(self, name) -> Player:
        # only works for 'nearby' players- useless
        return Player(self._b.players[name])

    def after(self, player: Optional[Player]) -> Player:
        otherPlayers = [p for p in self._b.players if p != 'termbanator']
        log.info(f'{otherPlayers=}')
        if not otherPlayers:
            log.info('everyone is gone')
            raise EveryoneGone()
        # import pdb;pdb.set_trace()
        prevPlayer = player.username if player else None

        try:
            i = otherPlayers.index(prevPlayer)
        except (AttributeError, ValueError):
            i = 0
        i = (i + 1) % len(otherPlayers)

        return self.byName(otherPlayers[i])


bot = Bot(mfbot)
players = Players(mfbot)


@On(mfbot, "login")
def login(this):
    bot.onLogin()


@On(mfbot, 'chat')
def handleMsg(this, sender, message, *args):
    if sender and (sender != 'termbanator'):
        pass


@On(mfbot, 'time')
def onTime(this):
    bot.update()


@On(mfbot, 'message')
def message(emitter, msg, *args):
    log.info(f'got msg event {msg.toString()}')