Add Eliza-like NPCs

Which should fit the vibe of this world.
This commit is contained in:
Howard Abrams 2025-06-30 22:17:59 -07:00
parent aeea787887
commit a5d88da4a8

View file

@ -1,10 +1,13 @@
#!/usr/bin/env python
from random import randint
from re import split
from pathlib import Path
from random import randint, randrange, choice
from re import split, sub
from evennia.utils import logger, delay
from typeclasses.objects import Object
from commands.misc import CmdSetCat, CmdSetOctopus
from typeclasses.objects import Object
from utils.word_list import routput
@ -129,3 +132,267 @@ class Octopus(Familiar):
def at_object_creation(self):
"Called when a octopus is first created."
self.cmdset.add(CmdSetOctopus, persistent=True)
class Key:
"""
Internal representation of an input key for the Eliza class.
"""
def __init__(self, word, weight, decomps):
self.word = word
self.weight = weight
self.decomps = decomps
class Decomp:
"""
Internal representation of a subsection of the data file for Eliza class.
"""
def __init__(self, parts, save, reasmbs):
self.parts = parts
self.save = save
self.reasmbs = reasmbs
self.next_reasmb_index = 0
class Eliza(NPC):
"""
Does it's best to respond and act like an NPC.
Stateful representation of an NPC's communication.
Create with the command:
@create/drop npc: typeclasses.npcs.Eliza
@set heron/data_file = "npcs.txt"
"""
def at_init(self):
"""
Called whenever typeclass is cached from memory,
at least once every server restart/reload.
"""
self.initials = []
self.finals = []
self.quits = []
self.pres = {}
self.posts = {}
self.synons = {}
self.keys = {}
self.memory = []
data_file = self.db.data_file or "npcs.txt"
logger.info(f"Reading file, {data_file}")
self.load_responses(data_file)
def other_say(self, speaker, speech):
if speech:
response = self.respond(speech)
if response:
msg = self.format_speech(response)
delay(1, self.location.msg_contents, msg)
def format_speech(self, speech):
name = choice([
self.key,
self.db._sdesc or self.name,
self.key.split(" ")[-1]
])
if speech.endswith("?"):
vocalizes = choice(self.db.speech_ask_vocalizations or [
"asks",
"questions" # etc.
])
elif speech.endswith("!"):
vocalizes = choice(self.db.speech_exclaim_vocalizations or [
"exclaims",
"excitedly says" # etc.
])
else:
vocalizes = choice(self.db.speech_vocalizations or [
"says",
"wryly says" # etc.
])
return choice(self.db.speech_formats or [
f"The {name} {vocalizes}, \"{speech}\"",
f"\"{speech}\" {vocalizes} the {name}."
])
def load_responses(self, data_file):
key = None
decomp = None
path = Path(__file__).with_name(data_file)
with open(path) as file:
for line in file:
if not line.strip():
continue
tag, content = [part.strip() for part in line.split(':')]
if tag == 'initial':
self.initials.append(content)
elif tag == 'final':
self.finals.append(content)
elif tag == 'quit':
self.quits.append(content)
elif tag == 'pre':
parts = content.split(' ')
self.pres[parts[0]] = parts[1:]
elif tag == 'post':
parts = content.split(' ')
self.posts[parts[0]] = parts[1:]
elif tag == 'synon':
parts = content.split(' ')
self.synons[parts[0]] = parts
elif tag == 'key':
parts = content.split(' ')
word = parts[0]
weight = int(parts[1]) if len(parts) > 1 else 1
key = Key(word, weight, [])
self.keys[word] = key
elif tag == 'decomp':
parts = content.split(' ')
save = False
if parts[0] == '$':
save = True
parts = parts[1:]
decomp = Decomp(parts, save, [])
key.decomps.append(decomp)
elif tag == 'reasmb':
parts = content.split(' ')
decomp.reasmbs.append(parts)
def _match_decomp_r(self, parts, words, results):
if not parts and not words:
return True
if not parts or (not words and parts != ['*']):
return False
if parts[0] == '*':
for index in range(len(words), -1, -1):
results.append(words[:index])
if self._match_decomp_r(parts[1:], words[index:], results):
return True
results.pop()
return False
if parts[0].startswith('@'):
root = parts[0][1:]
if root not in self.synons:
raise ValueError(f"Unknown synonym root {root}")
if not words[0].lower() in self.synons[root]:
return False
results.append([words[0]])
return self._match_decomp_r(parts[1:], words[1:], results)
elif parts[0].lower() != words[0].lower():
return False
else:
return self._match_decomp_r(parts[1:], words[1:], results)
def _match_decomp(self, parts, words):
results = []
if self._match_decomp_r(parts, words, results):
return results
return None
def _next_reasmb(self, decomp):
index = decomp.next_reasmb_index
result = decomp.reasmbs[index % len(decomp.reasmbs)]
decomp.next_reasmb_index = index + 1
return result
def _reassemble(self, reasmb, results):
output = []
for reword in reasmb:
if not reword:
continue
if reword[0] == '(' and reword[-1] == ')':
index = int(reword[1:-1])
if index < 1 or index > len(results):
raise ValueError(f"Invalid result index {index}")
insert = results[index - 1]
for punct in [',', '.', ';']:
if punct in insert:
insert = insert[:insert.index(punct)]
output.extend(insert)
else:
output.append(reword)
return output
def _sub(self, words, sub):
output = []
for word in words:
word_lower = word.lower()
if word_lower in sub:
output.extend(sub[word_lower])
else:
output.append(word)
return output
def _match_key(self, words, key):
for decomp in key.decomps:
results = self._match_decomp(decomp.parts, words)
if results is None:
# logger.info(f"Decomp did not match: {decomp.parts}")
continue
# logger.info(f"Decomp matched: {decomp.parts}")
# logger.info(f"Decomp results: {results}")
results = [self._sub(words, self.posts) for words in results]
# logger.info(f"Decomp results after posts: {results}")
reasmb = self._next_reasmb(decomp)
# logger.info(f"Using reassembly: {reasmb}")
if reasmb[0] == 'goto':
goto_key = reasmb[1]
if goto_key not in self.keys:
raise ValueError("Invalid goto key {}".format(goto_key))
# logger.info(f"Goto key: {goto_key}")
return self._match_key(words, self.keys[goto_key])
output = self._reassemble(reasmb, results)
if decomp.save:
self.memory.append(output)
# logger.info(f"Saved to memory: {output}")
continue
return output
return None
def respond(self, text):
if text.lower() in self.quits:
return None
text = sub(r'\s*\.+\s*', ' . ', text)
text = sub(r'\s*,+\s*', ' , ', text)
text = sub(r'\s*;+\s*', ' ; ', text)
# logger.info(f"After punctuation cleanup: {text}")
words = [w for w in text.split(' ') if w]
# logger.info(f"Input: {words}")
words = self._sub(words, self.pres)
# logger.info(f"After pre-substitution: {words}")
keys = [self.keys[w.lower()] for w in words if w.lower() in self.keys]
keys = sorted(keys, key=lambda k: -k.weight)
# logger.info(f"Sorted keys: {[(k.word, k.weight) for k in keys]}")
output = None
for key in keys:
output = self._match_key(words, key)
if output:
# logger.info(f"Output from key: {output}")
break
if not output:
if self.memory:
index = randrange(len(self.memory))
output = self.memory.pop(index)
# logger.info(f"Output from memory: {output}")
else:
output = self._next_reasmb(self.keys['xnone'].decomps[0])
# logger.info(f"Output from xnone: {output}")
return " ".join(output)
def initial(self):
return choice(self.initials)
def final(self):
return choice(self.finals)