#!/usr/bin/env python from pathlib import Path from random import randint, randrange, choice from re import split, sub from evennia.utils import logger, delay from commands.misc import CmdSetCat, CmdSetOctopus, CmdSetFrog from typeclasses.objects import Object from utils.word_list import routput class NPC(Object): """ An NPC is an NPC because it can react to what it _hears_. To do this, implement 'at_heard_say', a function that returns a string for a response. """ def at_heard_say(self, message, from_obj, is_say=True, is_whisper=False): "Override to return a string in response to message." pass def msg(self, text=None, from_obj=None, **kwargs): "Custom msg() method reacting to say." # make sure to not repeat what we ourselves said or we'll # create a loop if from_obj != self: is_say = False is_whisper = False try: say_text, is_say = text[0], text[1]['type'] == 'say' is_whisper = text[1]['type'] == 'whisper' except Exception: pass try: # message will be on the form ` says, "say_text"` # we want to get only say_text without the quotes and any spaces message = text.split('says, ')[1].strip(' "') shout, response = self.at_heard_say(message, from_obj, is_say, is_whisper) if response != None: self.at_say(response, just_owner=(shout == "shout")) except Exception: pass # this is needed if anyone ever puppets this NPC - without it # you would never get any feedback from the server (not even # the results of look) super().msg(text=text, from_obj=from_obj, **kwargs) class CarriableNPC(NPC): """ A carriable NPC is like any other NPC, except that since it can be carried and isn't locked down, it can't hear conversation in a room. """ def at_say(self, message, msg_self=None, msg_location=None, receivers=None, msg_receivers=None, just_owner=True, **kwargs): "Does the best it can to speak out loud." owner = self.location if self.location.is_typeclass("typeclasses.rooms.Room"): super().at_say(message, msg_self=msg_self, msg_location=msg_location, receivers=receivers, msg_receivers=msg_receivers) elif just_owner: owner.msg(f"The {self.name} says, \"{message}\"") else: owner.msg( f"The {self.name}, you are carrying, says, \"{message}\"") owner.location.msg( f"The {self.name}, carried by {self.location.name}, says, \"{message}\"", exclude=owner) class Familiar(NPC): """ Parent class for all familiars. These are pets that can be controlled by their owner to do antics. """ def at_do(self, owner, antics, alias=None): """ Issue a 'send_emote' into the room with a antic. """ if antics.startswith("'"): owner.announce_action(f"$Your() {self.get_name(alias)}{antics}") else: owner.announce_action(f"$Your() {self.get_name(alias)} {antics}") class Cat(Familiar): """ A puppetable 'cat' that acts based on that 'command'. """ def at_object_creation(self): "Called when a cat is first created." self.cmdset.add(CmdSetCat, persistent=True) class Octopus(Familiar): """ A puppetable 'octopus' that acts based on that 'command'. """ def at_object_creation(self): "Called when a octopus is first created." self.cmdset.add(CmdSetOctopus, persistent=True) class Frog(Familiar): """ A puppetable 'frog' that acts based on that 'command'. """ def at_object_creation(self): "Called when this frog is first created." self.cmdset.add(CmdSetFrog, persistent=True) class Key: """ Internal representation of an input key for the Eliza class. """ def __init__(self, word, weight, decomps): self.word = word self.weight = weight self.decomps = decomps class Decomp: """ Internal representation of a subsection of the data file for Eliza class. """ def __init__(self, parts, save, reasmbs): self.parts = parts self.save = save self.reasmbs = reasmbs self.next_reasmb_index = 0 class Eliza(NPC): """ Does it's best to respond and act like an NPC. Stateful representation of an NPC's communication. Create with the command: @create/drop npc: typeclasses.npcs.Eliza @set heron/data_file = "npcs.txt" """ def at_init(self): """ Called whenever typeclass is cached from memory, at least once every server restart/reload. """ self.initials = [] self.finals = [] self.quits = [] self.pres = {} self.posts = {} self.synons = {} self.keys = {} self.memory = [] data_file = self.db.data_file or "npcs.txt" logger.info(f"Reading file, {data_file}") self.load_responses(data_file) def other_say(self, speaker, speech): if speech: response = self.respond(speech) if response: msg = self.format_speech(response) delay(1, self.location.msg_contents, msg) def format_speech(self, speech): name = choice([ self.key, self.db._sdesc or self.name, self.key.split(" ")[-1] ]) if speech.endswith("?"): vocalizes = choice(self.db.speech_ask_vocalizations or [ "asks", "questions" # etc. ]) elif speech.endswith("!"): vocalizes = choice(self.db.speech_exclaim_vocalizations or [ "exclaims", "excitedly says" # etc. ]) else: vocalizes = choice(self.db.speech_vocalizations or [ "says", "wryly says" # etc. ]) return choice(self.db.speech_formats or [ f"The {name} {vocalizes}, \"{speech}\"", f"\"{speech}\" {vocalizes} the {name}." ]) def load_responses(self, data_file): key = None decomp = None path = Path(__file__).with_name(data_file) with open(path) as file: for line in file: if not line.strip(): continue tag, content = [part.strip() for part in line.split(':')] if tag == 'initial': self.initials.append(content) elif tag == 'final': self.finals.append(content) elif tag == 'quit': self.quits.append(content) elif tag == 'pre': parts = content.split(' ') self.pres[parts[0]] = parts[1:] elif tag == 'post': parts = content.split(' ') self.posts[parts[0]] = parts[1:] elif tag == 'synon': parts = content.split(' ') self.synons[parts[0]] = parts elif tag == 'key': parts = content.split(' ') word = parts[0] weight = int(parts[1]) if len(parts) > 1 else 1 key = Key(word, weight, []) self.keys[word] = key elif tag == 'decomp': parts = content.split(' ') save = False if parts[0] == '$': save = True parts = parts[1:] decomp = Decomp(parts, save, []) key.decomps.append(decomp) elif tag == 'reasmb': parts = content.split(' ') decomp.reasmbs.append(parts) def _match_decomp_r(self, parts, words, results): if not parts and not words: return True if not parts or (not words and parts != ['*']): return False if parts[0] == '*': for index in range(len(words), -1, -1): results.append(words[:index]) if self._match_decomp_r(parts[1:], words[index:], results): return True results.pop() return False if parts[0].startswith('@'): root = parts[0][1:] if root not in self.synons: raise ValueError(f"Unknown synonym root {root}") if not words[0].lower() in self.synons[root]: return False results.append([words[0]]) return self._match_decomp_r(parts[1:], words[1:], results) elif parts[0].lower() != words[0].lower(): return False else: return self._match_decomp_r(parts[1:], words[1:], results) def _match_decomp(self, parts, words): results = [] if self._match_decomp_r(parts, words, results): return results return None def _next_reasmb(self, decomp): index = decomp.next_reasmb_index result = decomp.reasmbs[index % len(decomp.reasmbs)] decomp.next_reasmb_index = index + 1 return result def _reassemble(self, reasmb, results): output = [] for reword in reasmb: if not reword: continue if reword[0] == '(' and reword[-1] == ')': index = int(reword[1:-1]) if index < 1 or index > len(results): raise ValueError(f"Invalid result index {index}") insert = results[index - 1] for punct in [',', '.', ';']: if punct in insert: insert = insert[:insert.index(punct)] output.extend(insert) else: output.append(reword) return output def _sub(self, words, sub): output = [] for word in words: word_lower = word.lower() if word_lower in sub: output.extend(sub[word_lower]) else: output.append(word) return output def _match_key(self, words, key): for decomp in key.decomps: results = self._match_decomp(decomp.parts, words) if results is None: # logger.info(f"Decomp did not match: {decomp.parts}") continue # logger.info(f"Decomp matched: {decomp.parts}") # logger.info(f"Decomp results: {results}") results = [self._sub(words, self.posts) for words in results] # logger.info(f"Decomp results after posts: {results}") reasmb = self._next_reasmb(decomp) # logger.info(f"Using reassembly: {reasmb}") if reasmb[0] == 'goto': goto_key = reasmb[1] if goto_key not in self.keys: raise ValueError("Invalid goto key {}".format(goto_key)) # logger.info(f"Goto key: {goto_key}") return self._match_key(words, self.keys[goto_key]) output = self._reassemble(reasmb, results) if decomp.save: self.memory.append(output) # logger.info(f"Saved to memory: {output}") continue return output return None def respond(self, text): if text.lower() in self.quits: return None text = sub(r'\s*\.+\s*', ' . ', text) text = sub(r'\s*,+\s*', ' , ', text) text = sub(r'\s*;+\s*', ' ; ', text) # logger.info(f"After punctuation cleanup: {text}") words = [w for w in text.split(' ') if w] # logger.info(f"Input: {words}") words = self._sub(words, self.pres) # logger.info(f"After pre-substitution: {words}") keys = [self.keys[w.lower()] for w in words if w.lower() in self.keys] keys = sorted(keys, key=lambda k: -k.weight) # logger.info(f"Sorted keys: {[(k.word, k.weight) for k in keys]}") output = None for key in keys: output = self._match_key(words, key) if output: # logger.info(f"Output from key: {output}") break if not output: if self.memory: index = randrange(len(self.memory)) output = self.memory.pop(index) # logger.info(f"Output from memory: {output}") else: output = self._next_reasmb(self.keys['xnone'].decomps[0]) # logger.info(f"Output from xnone: {output}") return " ".join(output) def initial(self): return choice(self.initials) def final(self): return choice(self.finals)