414 lines
14 KiB
Python
Executable file
414 lines
14 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
from pathlib import Path
|
|
from random import randint, randrange, choice
|
|
from re import split, sub
|
|
|
|
from evennia.utils import logger, delay
|
|
|
|
from commands.misc import CmdSetCat, CmdSetOctopus, CmdSetFrog
|
|
from typeclasses.objects import Object
|
|
from utils.word_list import routput
|
|
|
|
|
|
class NPC(Object):
|
|
"""
|
|
An NPC is an NPC because it can react to what it _hears_.
|
|
To do this, implement 'at_heard_say', a function that
|
|
returns a string for a response.
|
|
"""
|
|
def at_heard_say(self, message, from_obj,
|
|
is_say=True, is_whisper=False):
|
|
"Override to return a string in response to message."
|
|
pass
|
|
|
|
def msg(self, text=None, from_obj=None, **kwargs):
|
|
"Custom msg() method reacting to say."
|
|
# make sure to not repeat what we ourselves said or we'll
|
|
# create a loop
|
|
if from_obj != self:
|
|
is_say = False
|
|
is_whisper = False
|
|
try:
|
|
say_text, is_say = text[0], text[1]['type'] == 'say'
|
|
is_whisper = text[1]['type'] == 'whisper'
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
# message will be on the form `<Person> says, "say_text"`
|
|
# we want to get only say_text without the quotes and any spaces
|
|
message = text.split('says, ')[1].strip(' "')
|
|
shout, response = self.at_heard_say(message, from_obj,
|
|
is_say, is_whisper)
|
|
if response != None:
|
|
self.at_say(response, just_owner=(shout == "shout"))
|
|
except Exception:
|
|
pass
|
|
|
|
# this is needed if anyone ever puppets this NPC - without it
|
|
# you would never get any feedback from the server (not even
|
|
# the results of look)
|
|
super().msg(text=text, from_obj=from_obj, **kwargs)
|
|
|
|
|
|
class CarriableNPC(NPC):
|
|
"""
|
|
A carriable NPC is like any other NPC, except that since it can
|
|
be carried and isn't locked down, it can't hear conversation in a
|
|
room.
|
|
"""
|
|
def at_say(self, message, msg_self=None, msg_location=None,
|
|
receivers=None, msg_receivers=None, just_owner=True,
|
|
**kwargs):
|
|
"Does the best it can to speak out loud."
|
|
owner = self.location
|
|
|
|
if self.location.is_typeclass("typeclasses.rooms.Room"):
|
|
super().at_say(message, msg_self=msg_self,
|
|
msg_location=msg_location,
|
|
receivers=receivers,
|
|
msg_receivers=msg_receivers)
|
|
elif just_owner:
|
|
owner.msg(f"The {self.name} says, \"{message}\"")
|
|
else:
|
|
owner.msg(
|
|
f"The {self.name}, you are carrying, says, \"{message}\"")
|
|
owner.location.msg(
|
|
f"The {self.name}, carried by {self.location.name}, says, \"{message}\"", exclude=owner)
|
|
|
|
|
|
class Familiar(NPC):
|
|
"""
|
|
Parent class for all familiars.
|
|
|
|
These are pets that can be controlled by their owner to do antics.
|
|
"""
|
|
def at_do(self, owner, antics, alias=None):
|
|
"""
|
|
Issue a 'send_emote' into the room with a antic.
|
|
"""
|
|
def name_and_adjs(name):
|
|
"""
|
|
Split a long name into its parts.
|
|
"""
|
|
return list(filter(lambda s: s != "", split(r"[, ]+", name)))
|
|
|
|
def new_name(parts):
|
|
"""
|
|
Take a long name, like: fat, black cat
|
|
And return _part_ of the name, like:
|
|
|
|
- fat, black cat
|
|
- black cat
|
|
- cat
|
|
"""
|
|
num_adjs = len(parts)-1
|
|
lst_adjs = parts[randint(0, num_adjs):num_adjs]
|
|
|
|
noun = parts[-1]
|
|
adjs = ', '.join(lst_adjs)
|
|
return f"{adjs} {noun}" if len(adjs) > 0 else noun
|
|
|
|
parts = name_and_adjs(self.db._sdesc or self.name)
|
|
# The familiar's name:
|
|
if not alias or alias == parts[-1]:
|
|
name = new_name(parts)
|
|
else:
|
|
name = alias
|
|
|
|
if antics.startswith("'"):
|
|
owner.announce_action(f"$Your() {name}{antics}")
|
|
else:
|
|
owner.announce_action(f"$Your() {name} {antics}")
|
|
|
|
|
|
class Cat(Familiar):
|
|
"""
|
|
A puppetable 'cat' that acts based on that 'command'.
|
|
"""
|
|
def at_object_creation(self):
|
|
"Called when a cat is first created."
|
|
self.cmdset.add(CmdSetCat, persistent=True)
|
|
|
|
|
|
class Octopus(Familiar):
|
|
"""
|
|
A puppetable 'octopus' that acts based on that 'command'.
|
|
"""
|
|
def at_object_creation(self):
|
|
"Called when a octopus is first created."
|
|
self.cmdset.add(CmdSetOctopus, persistent=True)
|
|
|
|
class Frog(Familiar):
|
|
"""
|
|
A puppetable 'frog' that acts based on that 'command'.
|
|
"""
|
|
def at_object_creation(self):
|
|
"Called when a octopus is first created."
|
|
self.cmdset.add(CmdSetFrog, persistent=True)
|
|
|
|
|
|
class Key:
|
|
"""
|
|
Internal representation of an input key for the Eliza class.
|
|
"""
|
|
def __init__(self, word, weight, decomps):
|
|
self.word = word
|
|
self.weight = weight
|
|
self.decomps = decomps
|
|
|
|
|
|
class Decomp:
|
|
"""
|
|
Internal representation of a subsection of the data file for Eliza class.
|
|
"""
|
|
def __init__(self, parts, save, reasmbs):
|
|
self.parts = parts
|
|
self.save = save
|
|
self.reasmbs = reasmbs
|
|
self.next_reasmb_index = 0
|
|
|
|
|
|
class Eliza(NPC):
|
|
"""
|
|
Does it's best to respond and act like an NPC.
|
|
|
|
Stateful representation of an NPC's communication.
|
|
Create with the command:
|
|
|
|
@create/drop npc: typeclasses.npcs.Eliza
|
|
@set heron/data_file = "npcs.txt"
|
|
"""
|
|
def at_init(self):
|
|
"""
|
|
Called whenever typeclass is cached from memory,
|
|
at least once every server restart/reload.
|
|
"""
|
|
self.initials = []
|
|
self.finals = []
|
|
self.quits = []
|
|
self.pres = {}
|
|
self.posts = {}
|
|
self.synons = {}
|
|
self.keys = {}
|
|
self.memory = []
|
|
|
|
data_file = self.db.data_file or "npcs.txt"
|
|
logger.info(f"Reading file, {data_file}")
|
|
self.load_responses(data_file)
|
|
|
|
def other_say(self, speaker, speech):
|
|
if speech:
|
|
response = self.respond(speech)
|
|
if response:
|
|
msg = self.format_speech(response)
|
|
delay(1, self.location.msg_contents, msg)
|
|
|
|
def format_speech(self, speech):
|
|
name = choice([
|
|
self.key,
|
|
self.db._sdesc or self.name,
|
|
self.key.split(" ")[-1]
|
|
])
|
|
|
|
if speech.endswith("?"):
|
|
vocalizes = choice(self.db.speech_ask_vocalizations or [
|
|
"asks",
|
|
"questions" # etc.
|
|
])
|
|
elif speech.endswith("!"):
|
|
vocalizes = choice(self.db.speech_exclaim_vocalizations or [
|
|
"exclaims",
|
|
"excitedly says" # etc.
|
|
])
|
|
else:
|
|
vocalizes = choice(self.db.speech_vocalizations or [
|
|
"says",
|
|
"wryly says" # etc.
|
|
])
|
|
|
|
return choice(self.db.speech_formats or [
|
|
f"The {name} {vocalizes}, \"{speech}\"",
|
|
f"\"{speech}\" {vocalizes} the {name}."
|
|
])
|
|
|
|
def load_responses(self, data_file):
|
|
key = None
|
|
decomp = None
|
|
|
|
path = Path(__file__).with_name(data_file)
|
|
|
|
with open(path) as file:
|
|
for line in file:
|
|
if not line.strip():
|
|
continue
|
|
tag, content = [part.strip() for part in line.split(':')]
|
|
if tag == 'initial':
|
|
self.initials.append(content)
|
|
elif tag == 'final':
|
|
self.finals.append(content)
|
|
elif tag == 'quit':
|
|
self.quits.append(content)
|
|
elif tag == 'pre':
|
|
parts = content.split(' ')
|
|
self.pres[parts[0]] = parts[1:]
|
|
elif tag == 'post':
|
|
parts = content.split(' ')
|
|
self.posts[parts[0]] = parts[1:]
|
|
elif tag == 'synon':
|
|
parts = content.split(' ')
|
|
self.synons[parts[0]] = parts
|
|
elif tag == 'key':
|
|
parts = content.split(' ')
|
|
word = parts[0]
|
|
weight = int(parts[1]) if len(parts) > 1 else 1
|
|
key = Key(word, weight, [])
|
|
self.keys[word] = key
|
|
elif tag == 'decomp':
|
|
parts = content.split(' ')
|
|
save = False
|
|
if parts[0] == '$':
|
|
save = True
|
|
parts = parts[1:]
|
|
decomp = Decomp(parts, save, [])
|
|
key.decomps.append(decomp)
|
|
elif tag == 'reasmb':
|
|
parts = content.split(' ')
|
|
decomp.reasmbs.append(parts)
|
|
|
|
def _match_decomp_r(self, parts, words, results):
|
|
if not parts and not words:
|
|
return True
|
|
if not parts or (not words and parts != ['*']):
|
|
return False
|
|
if parts[0] == '*':
|
|
for index in range(len(words), -1, -1):
|
|
results.append(words[:index])
|
|
if self._match_decomp_r(parts[1:], words[index:], results):
|
|
return True
|
|
results.pop()
|
|
return False
|
|
|
|
if parts[0].startswith('@'):
|
|
root = parts[0][1:]
|
|
if root not in self.synons:
|
|
raise ValueError(f"Unknown synonym root {root}")
|
|
if not words[0].lower() in self.synons[root]:
|
|
return False
|
|
results.append([words[0]])
|
|
return self._match_decomp_r(parts[1:], words[1:], results)
|
|
elif parts[0].lower() != words[0].lower():
|
|
return False
|
|
else:
|
|
return self._match_decomp_r(parts[1:], words[1:], results)
|
|
|
|
def _match_decomp(self, parts, words):
|
|
results = []
|
|
if self._match_decomp_r(parts, words, results):
|
|
return results
|
|
return None
|
|
|
|
def _next_reasmb(self, decomp):
|
|
index = decomp.next_reasmb_index
|
|
result = decomp.reasmbs[index % len(decomp.reasmbs)]
|
|
decomp.next_reasmb_index = index + 1
|
|
return result
|
|
|
|
def _reassemble(self, reasmb, results):
|
|
output = []
|
|
for reword in reasmb:
|
|
if not reword:
|
|
continue
|
|
if reword[0] == '(' and reword[-1] == ')':
|
|
index = int(reword[1:-1])
|
|
if index < 1 or index > len(results):
|
|
raise ValueError(f"Invalid result index {index}")
|
|
insert = results[index - 1]
|
|
for punct in [',', '.', ';']:
|
|
if punct in insert:
|
|
insert = insert[:insert.index(punct)]
|
|
output.extend(insert)
|
|
else:
|
|
output.append(reword)
|
|
return output
|
|
|
|
def _sub(self, words, sub):
|
|
output = []
|
|
for word in words:
|
|
word_lower = word.lower()
|
|
if word_lower in sub:
|
|
output.extend(sub[word_lower])
|
|
else:
|
|
output.append(word)
|
|
return output
|
|
|
|
def _match_key(self, words, key):
|
|
for decomp in key.decomps:
|
|
results = self._match_decomp(decomp.parts, words)
|
|
if results is None:
|
|
# logger.info(f"Decomp did not match: {decomp.parts}")
|
|
continue
|
|
# logger.info(f"Decomp matched: {decomp.parts}")
|
|
# logger.info(f"Decomp results: {results}")
|
|
results = [self._sub(words, self.posts) for words in results]
|
|
# logger.info(f"Decomp results after posts: {results}")
|
|
reasmb = self._next_reasmb(decomp)
|
|
# logger.info(f"Using reassembly: {reasmb}")
|
|
if reasmb[0] == 'goto':
|
|
goto_key = reasmb[1]
|
|
if goto_key not in self.keys:
|
|
raise ValueError("Invalid goto key {}".format(goto_key))
|
|
# logger.info(f"Goto key: {goto_key}")
|
|
return self._match_key(words, self.keys[goto_key])
|
|
output = self._reassemble(reasmb, results)
|
|
if decomp.save:
|
|
self.memory.append(output)
|
|
# logger.info(f"Saved to memory: {output}")
|
|
continue
|
|
return output
|
|
return None
|
|
|
|
def respond(self, text):
|
|
if text.lower() in self.quits:
|
|
return None
|
|
|
|
text = sub(r'\s*\.+\s*', ' . ', text)
|
|
text = sub(r'\s*,+\s*', ' , ', text)
|
|
text = sub(r'\s*;+\s*', ' ; ', text)
|
|
# logger.info(f"After punctuation cleanup: {text}")
|
|
|
|
words = [w for w in text.split(' ') if w]
|
|
# logger.info(f"Input: {words}")
|
|
|
|
words = self._sub(words, self.pres)
|
|
# logger.info(f"After pre-substitution: {words}")
|
|
|
|
keys = [self.keys[w.lower()] for w in words if w.lower() in self.keys]
|
|
keys = sorted(keys, key=lambda k: -k.weight)
|
|
# logger.info(f"Sorted keys: {[(k.word, k.weight) for k in keys]}")
|
|
|
|
output = None
|
|
|
|
for key in keys:
|
|
output = self._match_key(words, key)
|
|
if output:
|
|
# logger.info(f"Output from key: {output}")
|
|
break
|
|
if not output:
|
|
if self.memory:
|
|
index = randrange(len(self.memory))
|
|
output = self.memory.pop(index)
|
|
# logger.info(f"Output from memory: {output}")
|
|
else:
|
|
output = self._next_reasmb(self.keys['xnone'].decomps[0])
|
|
# logger.info(f"Output from xnone: {output}")
|
|
|
|
return " ".join(output)
|
|
|
|
def initial(self):
|
|
return choice(self.initials)
|
|
|
|
def final(self):
|
|
return choice(self.finals)
|