From a5d88da4a81a367a7eab982a7a62449e40dab69e Mon Sep 17 00:00:00 2001 From: Howard Abrams Date: Mon, 30 Jun 2025 22:17:59 -0700 Subject: [PATCH] Add Eliza-like NPCs Which should fit the vibe of this world. --- typeclasses/npcs.py | 273 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 270 insertions(+), 3 deletions(-) diff --git a/typeclasses/npcs.py b/typeclasses/npcs.py index ed7eb9b..b89cfcf 100755 --- a/typeclasses/npcs.py +++ b/typeclasses/npcs.py @@ -1,10 +1,13 @@ #!/usr/bin/env python -from random import randint -from re import split +from pathlib import Path +from random import randint, randrange, choice +from re import split, sub + +from evennia.utils import logger, delay -from typeclasses.objects import Object from commands.misc import CmdSetCat, CmdSetOctopus +from typeclasses.objects import Object from utils.word_list import routput @@ -129,3 +132,267 @@ class Octopus(Familiar): def at_object_creation(self): "Called when a octopus is first created." self.cmdset.add(CmdSetOctopus, persistent=True) + + +class Key: + """ + Internal representation of an input key for the Eliza class. + """ + def __init__(self, word, weight, decomps): + self.word = word + self.weight = weight + self.decomps = decomps + + +class Decomp: + """ + Internal representation of a subsection of the data file for Eliza class. + """ + def __init__(self, parts, save, reasmbs): + self.parts = parts + self.save = save + self.reasmbs = reasmbs + self.next_reasmb_index = 0 + + +class Eliza(NPC): + """ + Does it's best to respond and act like an NPC. + + Stateful representation of an NPC's communication. + Create with the command: + + @create/drop npc: typeclasses.npcs.Eliza + @set heron/data_file = "npcs.txt" + """ + def at_init(self): + """ + Called whenever typeclass is cached from memory, + at least once every server restart/reload. + """ + self.initials = [] + self.finals = [] + self.quits = [] + self.pres = {} + self.posts = {} + self.synons = {} + self.keys = {} + self.memory = [] + + data_file = self.db.data_file or "npcs.txt" + logger.info(f"Reading file, {data_file}") + self.load_responses(data_file) + + def other_say(self, speaker, speech): + if speech: + response = self.respond(speech) + if response: + msg = self.format_speech(response) + delay(1, self.location.msg_contents, msg) + + def format_speech(self, speech): + name = choice([ + self.key, + self.db._sdesc or self.name, + self.key.split(" ")[-1] + ]) + + if speech.endswith("?"): + vocalizes = choice(self.db.speech_ask_vocalizations or [ + "asks", + "questions" # etc. + ]) + elif speech.endswith("!"): + vocalizes = choice(self.db.speech_exclaim_vocalizations or [ + "exclaims", + "excitedly says" # etc. + ]) + else: + vocalizes = choice(self.db.speech_vocalizations or [ + "says", + "wryly says" # etc. + ]) + + return choice(self.db.speech_formats or [ + f"The {name} {vocalizes}, \"{speech}\"", + f"\"{speech}\" {vocalizes} the {name}." + ]) + + def load_responses(self, data_file): + key = None + decomp = None + + path = Path(__file__).with_name(data_file) + + with open(path) as file: + for line in file: + if not line.strip(): + continue + tag, content = [part.strip() for part in line.split(':')] + if tag == 'initial': + self.initials.append(content) + elif tag == 'final': + self.finals.append(content) + elif tag == 'quit': + self.quits.append(content) + elif tag == 'pre': + parts = content.split(' ') + self.pres[parts[0]] = parts[1:] + elif tag == 'post': + parts = content.split(' ') + self.posts[parts[0]] = parts[1:] + elif tag == 'synon': + parts = content.split(' ') + self.synons[parts[0]] = parts + elif tag == 'key': + parts = content.split(' ') + word = parts[0] + weight = int(parts[1]) if len(parts) > 1 else 1 + key = Key(word, weight, []) + self.keys[word] = key + elif tag == 'decomp': + parts = content.split(' ') + save = False + if parts[0] == '$': + save = True + parts = parts[1:] + decomp = Decomp(parts, save, []) + key.decomps.append(decomp) + elif tag == 'reasmb': + parts = content.split(' ') + decomp.reasmbs.append(parts) + + def _match_decomp_r(self, parts, words, results): + if not parts and not words: + return True + if not parts or (not words and parts != ['*']): + return False + if parts[0] == '*': + for index in range(len(words), -1, -1): + results.append(words[:index]) + if self._match_decomp_r(parts[1:], words[index:], results): + return True + results.pop() + return False + + if parts[0].startswith('@'): + root = parts[0][1:] + if root not in self.synons: + raise ValueError(f"Unknown synonym root {root}") + if not words[0].lower() in self.synons[root]: + return False + results.append([words[0]]) + return self._match_decomp_r(parts[1:], words[1:], results) + elif parts[0].lower() != words[0].lower(): + return False + else: + return self._match_decomp_r(parts[1:], words[1:], results) + + def _match_decomp(self, parts, words): + results = [] + if self._match_decomp_r(parts, words, results): + return results + return None + + def _next_reasmb(self, decomp): + index = decomp.next_reasmb_index + result = decomp.reasmbs[index % len(decomp.reasmbs)] + decomp.next_reasmb_index = index + 1 + return result + + def _reassemble(self, reasmb, results): + output = [] + for reword in reasmb: + if not reword: + continue + if reword[0] == '(' and reword[-1] == ')': + index = int(reword[1:-1]) + if index < 1 or index > len(results): + raise ValueError(f"Invalid result index {index}") + insert = results[index - 1] + for punct in [',', '.', ';']: + if punct in insert: + insert = insert[:insert.index(punct)] + output.extend(insert) + else: + output.append(reword) + return output + + def _sub(self, words, sub): + output = [] + for word in words: + word_lower = word.lower() + if word_lower in sub: + output.extend(sub[word_lower]) + else: + output.append(word) + return output + + def _match_key(self, words, key): + for decomp in key.decomps: + results = self._match_decomp(decomp.parts, words) + if results is None: + # logger.info(f"Decomp did not match: {decomp.parts}") + continue + # logger.info(f"Decomp matched: {decomp.parts}") + # logger.info(f"Decomp results: {results}") + results = [self._sub(words, self.posts) for words in results] + # logger.info(f"Decomp results after posts: {results}") + reasmb = self._next_reasmb(decomp) + # logger.info(f"Using reassembly: {reasmb}") + if reasmb[0] == 'goto': + goto_key = reasmb[1] + if goto_key not in self.keys: + raise ValueError("Invalid goto key {}".format(goto_key)) + # logger.info(f"Goto key: {goto_key}") + return self._match_key(words, self.keys[goto_key]) + output = self._reassemble(reasmb, results) + if decomp.save: + self.memory.append(output) + # logger.info(f"Saved to memory: {output}") + continue + return output + return None + + def respond(self, text): + if text.lower() in self.quits: + return None + + text = sub(r'\s*\.+\s*', ' . ', text) + text = sub(r'\s*,+\s*', ' , ', text) + text = sub(r'\s*;+\s*', ' ; ', text) + # logger.info(f"After punctuation cleanup: {text}") + + words = [w for w in text.split(' ') if w] + # logger.info(f"Input: {words}") + + words = self._sub(words, self.pres) + # logger.info(f"After pre-substitution: {words}") + + keys = [self.keys[w.lower()] for w in words if w.lower() in self.keys] + keys = sorted(keys, key=lambda k: -k.weight) + # logger.info(f"Sorted keys: {[(k.word, k.weight) for k in keys]}") + + output = None + + for key in keys: + output = self._match_key(words, key) + if output: + # logger.info(f"Output from key: {output}") + break + if not output: + if self.memory: + index = randrange(len(self.memory)) + output = self.memory.pop(index) + # logger.info(f"Output from memory: {output}") + else: + output = self._next_reasmb(self.keys['xnone'].decomps[0]) + # logger.info(f"Output from xnone: {output}") + + return " ".join(output) + + def initial(self): + return choice(self.initials) + + def final(self): + return choice(self.finals)