moss-n-puddles/typeclasses/npcs.py
Howard Abrams b33f6ca7df Can make pets into familiars
Refactor get_name to a top-level place on the Object.
2026-02-24 21:44:09 -08:00

385 lines
13 KiB
Python
Executable file

#!/usr/bin/env python
from pathlib import Path
from random import randint, randrange, choice
from re import split, sub
from evennia.utils import logger, delay
from commands.misc import CmdSetCat, CmdSetOctopus, CmdSetFrog
from typeclasses.objects import Object
from utils.word_list import routput
class NPC(Object):
"""
An NPC is an NPC because it can react to what it _hears_.
To do this, implement 'at_heard_say', a function that
returns a string for a response.
"""
def at_heard_say(self, message, from_obj,
is_say=True, is_whisper=False):
"Override to return a string in response to message."
pass
def msg(self, text=None, from_obj=None, **kwargs):
"Custom msg() method reacting to say."
# make sure to not repeat what we ourselves said or we'll
# create a loop
if from_obj != self:
is_say = False
is_whisper = False
try:
say_text, is_say = text[0], text[1]['type'] == 'say'
is_whisper = text[1]['type'] == 'whisper'
except Exception:
pass
try:
# message will be on the form `<Person> says, "say_text"`
# we want to get only say_text without the quotes and any spaces
message = text.split('says, ')[1].strip(' "')
shout, response = self.at_heard_say(message, from_obj,
is_say, is_whisper)
if response != None:
self.at_say(response, just_owner=(shout == "shout"))
except Exception:
pass
# this is needed if anyone ever puppets this NPC - without it
# you would never get any feedback from the server (not even
# the results of look)
super().msg(text=text, from_obj=from_obj, **kwargs)
class CarriableNPC(NPC):
"""
A carriable NPC is like any other NPC, except that since it can
be carried and isn't locked down, it can't hear conversation in a
room.
"""
def at_say(self, message, msg_self=None, msg_location=None,
receivers=None, msg_receivers=None, just_owner=True,
**kwargs):
"Does the best it can to speak out loud."
owner = self.location
if self.location.is_typeclass("typeclasses.rooms.Room"):
super().at_say(message, msg_self=msg_self,
msg_location=msg_location,
receivers=receivers,
msg_receivers=msg_receivers)
elif just_owner:
owner.msg(f"The {self.name} says, \"{message}\"")
else:
owner.msg(
f"The {self.name}, you are carrying, says, \"{message}\"")
owner.location.msg(
f"The {self.name}, carried by {self.location.name}, says, \"{message}\"", exclude=owner)
class Familiar(NPC):
"""
Parent class for all familiars.
These are pets that can be controlled by their owner to do antics.
"""
def at_do(self, owner, antics, alias=None):
"""
Issue a 'send_emote' into the room with a antic.
"""
if antics.startswith("'"):
owner.announce_action(f"$Your() {self.get_name(alias)}{antics}")
else:
owner.announce_action(f"$Your() {self.get_name(alias)} {antics}")
class Cat(Familiar):
"""
A puppetable 'cat' that acts based on that 'command'.
"""
def at_object_creation(self):
"Called when a cat is first created."
self.cmdset.add(CmdSetCat, persistent=True)
class Octopus(Familiar):
"""
A puppetable 'octopus' that acts based on that 'command'.
"""
def at_object_creation(self):
"Called when a octopus is first created."
self.cmdset.add(CmdSetOctopus, persistent=True)
class Frog(Familiar):
"""
A puppetable 'frog' that acts based on that 'command'.
"""
def at_object_creation(self):
"Called when this frog is first created."
self.cmdset.add(CmdSetFrog, persistent=True)
class Key:
"""
Internal representation of an input key for the Eliza class.
"""
def __init__(self, word, weight, decomps):
self.word = word
self.weight = weight
self.decomps = decomps
class Decomp:
"""
Internal representation of a subsection of the data file for Eliza class.
"""
def __init__(self, parts, save, reasmbs):
self.parts = parts
self.save = save
self.reasmbs = reasmbs
self.next_reasmb_index = 0
class Eliza(NPC):
"""
Does it's best to respond and act like an NPC.
Stateful representation of an NPC's communication.
Create with the command:
@create/drop npc: typeclasses.npcs.Eliza
@set heron/data_file = "npcs.txt"
"""
def at_init(self):
"""
Called whenever typeclass is cached from memory,
at least once every server restart/reload.
"""
self.initials = []
self.finals = []
self.quits = []
self.pres = {}
self.posts = {}
self.synons = {}
self.keys = {}
self.memory = []
data_file = self.db.data_file or "npcs.txt"
logger.info(f"Reading file, {data_file}")
self.load_responses(data_file)
def other_say(self, speaker, speech):
if speech:
response = self.respond(speech)
if response:
msg = self.format_speech(response)
delay(1, self.location.msg_contents, msg)
def format_speech(self, speech):
name = choice([
self.key,
self.db._sdesc or self.name,
self.key.split(" ")[-1]
])
if speech.endswith("?"):
vocalizes = choice(self.db.speech_ask_vocalizations or [
"asks",
"questions" # etc.
])
elif speech.endswith("!"):
vocalizes = choice(self.db.speech_exclaim_vocalizations or [
"exclaims",
"excitedly says" # etc.
])
else:
vocalizes = choice(self.db.speech_vocalizations or [
"says",
"wryly says" # etc.
])
return choice(self.db.speech_formats or [
f"The {name} {vocalizes}, \"{speech}\"",
f"\"{speech}\" {vocalizes} the {name}."
])
def load_responses(self, data_file):
key = None
decomp = None
path = Path(__file__).with_name(data_file)
with open(path) as file:
for line in file:
if not line.strip():
continue
tag, content = [part.strip() for part in line.split(':')]
if tag == 'initial':
self.initials.append(content)
elif tag == 'final':
self.finals.append(content)
elif tag == 'quit':
self.quits.append(content)
elif tag == 'pre':
parts = content.split(' ')
self.pres[parts[0]] = parts[1:]
elif tag == 'post':
parts = content.split(' ')
self.posts[parts[0]] = parts[1:]
elif tag == 'synon':
parts = content.split(' ')
self.synons[parts[0]] = parts
elif tag == 'key':
parts = content.split(' ')
word = parts[0]
weight = int(parts[1]) if len(parts) > 1 else 1
key = Key(word, weight, [])
self.keys[word] = key
elif tag == 'decomp':
parts = content.split(' ')
save = False
if parts[0] == '$':
save = True
parts = parts[1:]
decomp = Decomp(parts, save, [])
key.decomps.append(decomp)
elif tag == 'reasmb':
parts = content.split(' ')
decomp.reasmbs.append(parts)
def _match_decomp_r(self, parts, words, results):
if not parts and not words:
return True
if not parts or (not words and parts != ['*']):
return False
if parts[0] == '*':
for index in range(len(words), -1, -1):
results.append(words[:index])
if self._match_decomp_r(parts[1:], words[index:], results):
return True
results.pop()
return False
if parts[0].startswith('@'):
root = parts[0][1:]
if root not in self.synons:
raise ValueError(f"Unknown synonym root {root}")
if not words[0].lower() in self.synons[root]:
return False
results.append([words[0]])
return self._match_decomp_r(parts[1:], words[1:], results)
elif parts[0].lower() != words[0].lower():
return False
else:
return self._match_decomp_r(parts[1:], words[1:], results)
def _match_decomp(self, parts, words):
results = []
if self._match_decomp_r(parts, words, results):
return results
return None
def _next_reasmb(self, decomp):
index = decomp.next_reasmb_index
result = decomp.reasmbs[index % len(decomp.reasmbs)]
decomp.next_reasmb_index = index + 1
return result
def _reassemble(self, reasmb, results):
output = []
for reword in reasmb:
if not reword:
continue
if reword[0] == '(' and reword[-1] == ')':
index = int(reword[1:-1])
if index < 1 or index > len(results):
raise ValueError(f"Invalid result index {index}")
insert = results[index - 1]
for punct in [',', '.', ';']:
if punct in insert:
insert = insert[:insert.index(punct)]
output.extend(insert)
else:
output.append(reword)
return output
def _sub(self, words, sub):
output = []
for word in words:
word_lower = word.lower()
if word_lower in sub:
output.extend(sub[word_lower])
else:
output.append(word)
return output
def _match_key(self, words, key):
for decomp in key.decomps:
results = self._match_decomp(decomp.parts, words)
if results is None:
# logger.info(f"Decomp did not match: {decomp.parts}")
continue
# logger.info(f"Decomp matched: {decomp.parts}")
# logger.info(f"Decomp results: {results}")
results = [self._sub(words, self.posts) for words in results]
# logger.info(f"Decomp results after posts: {results}")
reasmb = self._next_reasmb(decomp)
# logger.info(f"Using reassembly: {reasmb}")
if reasmb[0] == 'goto':
goto_key = reasmb[1]
if goto_key not in self.keys:
raise ValueError("Invalid goto key {}".format(goto_key))
# logger.info(f"Goto key: {goto_key}")
return self._match_key(words, self.keys[goto_key])
output = self._reassemble(reasmb, results)
if decomp.save:
self.memory.append(output)
# logger.info(f"Saved to memory: {output}")
continue
return output
return None
def respond(self, text):
if text.lower() in self.quits:
return None
text = sub(r'\s*\.+\s*', ' . ', text)
text = sub(r'\s*,+\s*', ' , ', text)
text = sub(r'\s*;+\s*', ' ; ', text)
# logger.info(f"After punctuation cleanup: {text}")
words = [w for w in text.split(' ') if w]
# logger.info(f"Input: {words}")
words = self._sub(words, self.pres)
# logger.info(f"After pre-substitution: {words}")
keys = [self.keys[w.lower()] for w in words if w.lower() in self.keys]
keys = sorted(keys, key=lambda k: -k.weight)
# logger.info(f"Sorted keys: {[(k.word, k.weight) for k in keys]}")
output = None
for key in keys:
output = self._match_key(words, key)
if output:
# logger.info(f"Output from key: {output}")
break
if not output:
if self.memory:
index = randrange(len(self.memory))
output = self.memory.pop(index)
# logger.info(f"Output from memory: {output}")
else:
output = self._next_reasmb(self.keys['xnone'].decomps[0])
# logger.info(f"Output from xnone: {output}")
return " ".join(output)
def initial(self):
return choice(self.initials)
def final(self):
return choice(self.finals)