This commit gives an "AI" capability to any object (still triggered by the 'say' command), so now the Wee Beastie can do more than purr. Also fixes the Witch and the Dragon's movements throughout the Realm. We can also Scry into rooms in order to watch the behavior of the Chatbots.
977 lines
38 KiB
Python
Executable file
977 lines
38 KiB
Python
Executable file
#!/usr/bin/env python
|
|
"""
|
|
Object
|
|
|
|
The Object is the class for general items in the game world.
|
|
|
|
Use the ObjectParent class to implement common features for *all* entities
|
|
with a location in the game world (like Characters, Rooms, Exits).
|
|
|
|
"""
|
|
|
|
import anthropic
|
|
import json
|
|
from datetime import datetime
|
|
from os import makedirs, path
|
|
from os.path import dirname, exists
|
|
from pathlib import Path
|
|
from re import split, match, search, sub, IGNORECASE
|
|
from shutil import copyfile
|
|
from random import randint, choice
|
|
|
|
from django.conf import settings
|
|
|
|
from evennia.contrib.rpg.rpsystem.rpsystem import ContribRPObject, parse_sdescs_and_recogs
|
|
from evennia.objects.models import ObjectDB
|
|
from evennia.prototypes.spawner import spawn
|
|
from evennia.utils import delay, logger
|
|
from evennia.utils.search import search_object
|
|
|
|
from utils.scoring import Scores
|
|
from utils.word_list import routput
|
|
|
|
|
|
personality_dir = "personalities"
|
|
|
|
|
|
def fix_paragraph(paragraph):
|
|
"""
|
|
Because the number of tokens is small, a response may end mid-sentence.
|
|
Seems like displaying everything but the last fragment is sufficient.
|
|
"""
|
|
sentences = split(r'(?<=[.!?])\s+', paragraph)
|
|
if not search(r"[.!?]\"?$", sentences[-1]):
|
|
sentences.pop()
|
|
return ' '.join(sentences)
|
|
|
|
|
|
class ObjectParent:
|
|
"""
|
|
This is a mixin that can be used to override *all* entities inheriting at
|
|
some distance from DefaultObject (Objects, Exits, Characters and Rooms).
|
|
|
|
Just add any method that exists on `DefaultObject` to this class. If one
|
|
of the derived classes has itself defined that same hook already, that will
|
|
take precedence.
|
|
|
|
"""
|
|
appearance_template = """|/{desc}
|
|
{exits}
|
|
{characters}
|
|
{things}
|
|
{footer}
|
|
"""
|
|
|
|
def get_display_footer(self, _, **kwargs):
|
|
return "\n"
|
|
|
|
def has_method(self, method_name):
|
|
"""True if this object has a method of a particular name."""
|
|
return hasattr(self, method_name) and callable(getattr(self, method_name))
|
|
|
|
def get_name(self, alias=None):
|
|
def name_and_adjs(name):
|
|
"""
|
|
Split a long name into its parts.
|
|
"""
|
|
return list(filter(lambda s: s != "", split(r"[, ]+", name)))
|
|
|
|
def new_name(parts):
|
|
"""
|
|
Take a long name, like: fat, black cat
|
|
And return _part_ of the name, like:
|
|
|
|
- fat, black cat
|
|
- black cat
|
|
- cat
|
|
"""
|
|
num_adjs = len(parts)-1
|
|
lst_adjs = parts[randint(0, num_adjs):num_adjs]
|
|
|
|
noun = parts[-1]
|
|
adjs = ', '.join(lst_adjs)
|
|
return f"{adjs} {noun}" if len(adjs) > 0 else noun
|
|
|
|
parts = name_and_adjs(self.db._sdesc or self.name)
|
|
# The familiar's name:
|
|
if not alias or alias == parts[-1]:
|
|
return new_name(parts)
|
|
else:
|
|
return alias
|
|
|
|
def has(self, item):
|
|
"""
|
|
Return true if object has an item.
|
|
Where item is probably a string name to match an item's key.
|
|
It can also be a type, for instance:
|
|
|
|
character.has(typeclasses.drinkables.TeaCup)
|
|
"""
|
|
for i in self.contents:
|
|
if isinstance(item, str) and (i.key == item or i.aliases.get(item)):
|
|
return i
|
|
|
|
if item is type(i):
|
|
return i
|
|
|
|
if i == item:
|
|
return i
|
|
|
|
return None
|
|
|
|
def client_height(self):
|
|
"""
|
|
Get the client screenheight for the session using this command.
|
|
|
|
Returns:
|
|
client height (int): The height (in characters) of the client window.
|
|
|
|
Not sure why this isn't part of the engine.
|
|
"""
|
|
if self.sessions:
|
|
session = self.sessions.get()[0]
|
|
return session.protocol_flags.get(
|
|
"SCREENHEIGHT", {0: settings.CLIENT_DEFAULT_HEIGHT}
|
|
)[0]
|
|
return settings.CLIENT_DEFAULT_HEIGHT
|
|
|
|
def characters_here(self, puppets=False):
|
|
"""
|
|
Return a list of characters in the current location.
|
|
"""
|
|
return [char
|
|
for char in self.location.contents
|
|
if char != self and
|
|
(char.is_typeclass("typeclasses.characters.Character")
|
|
or (char.is_typeclass("typeclasses.puppets.Puppet") and puppets))]
|
|
|
|
def get_search_result(self, searchdata, attribute_name=None,
|
|
typeclass=None, candidates=None, exact=False,
|
|
use_dbref=None, tags=None, **kwargs):
|
|
"""
|
|
Redefine function to address bug in `ContribRPObject`
|
|
where non-builder characters need to call the system's
|
|
search when looking for aliases.
|
|
"""
|
|
return ObjectDB.objects.search_object(
|
|
searchdata,
|
|
attribute_name=attribute_name,
|
|
typeclass=typeclass,
|
|
candidates=candidates,
|
|
exact=exact,
|
|
use_dbref=use_dbref,
|
|
tags=tags,
|
|
)
|
|
|
|
def delay_sequence(self, sequence_str, time_delay=1, *args):
|
|
"""Run a sequence of messages or commands with a delay.
|
|
|
|
The 'sequence_str' is a number of commands separated by ';;'
|
|
character sequence. The command can by standard things like
|
|
'say' or 'give', but can be a message delivered to the room
|
|
location, if it begins with a # or 'gm'.
|
|
|
|
The command can also be a number, in which case, the next
|
|
command (and all subsequent commands), will be delayed by that
|
|
number of seconds.
|
|
|
|
For instance:
|
|
|
|
'say Hello there, want a drink? ;; 8 ;; # He works on shaking a cocktail. ;; 2 ;; shake whisky = avatar'
|
|
|
|
Which could show:
|
|
|
|
Blonde elf says, "Hello there, want a drink?"
|
|
<8 seconds pass>
|
|
He works on shaking a cocktail.
|
|
<2 seconds pass>
|
|
You now have a whisky.
|
|
"""
|
|
def convert(x):
|
|
try:
|
|
return int(x)
|
|
except ValueError:
|
|
return x
|
|
|
|
if self.ndb.current_sequence and self.ndb.current_sequence == sequence_str:
|
|
logger.info("Duplicate sequences. Ignoring.")
|
|
return
|
|
self.ndb.current_sequence = sequence_str
|
|
|
|
lines = [convert(line) for line in split(r" *;; *", sequence_str)]
|
|
pause = 0
|
|
for line in lines:
|
|
if isinstance(line, int):
|
|
time_delay = line
|
|
else:
|
|
pause = pause + time_delay
|
|
|
|
m = match(r"(^# *|^gm +)(.*)", line)
|
|
if m:
|
|
logger.info(f"GM'd: {m.group(2)}")
|
|
if self.location:
|
|
delay(pause, self.location.msg_contents, routput(m.group(2), *args))
|
|
else:
|
|
delay(pause, self.msg_contents, routput(m.group(2), *args))
|
|
else:
|
|
cmd = routput(line, *args)
|
|
delay(pause, self.do_cmd, cmd)
|
|
|
|
delay(pause, self.nattributes.remove, "current_sequence")
|
|
|
|
|
|
class Object(ObjectParent, ContribRPObject):
|
|
"""
|
|
This is the root Object typeclass, representing all entities that
|
|
have an actual presence in-game. DefaultObjects generally have a
|
|
location. They can also be manipulated and looked at. Game
|
|
entities you define should inherit from DefaultObject at some distance.
|
|
|
|
It is recommended to create children of this class using the
|
|
`evennia.create_object()` function rather than to initialize the class
|
|
directly - this will both set things up and efficiently save the object
|
|
without `obj.save()` having to be called explicitly.
|
|
|
|
Note: Check the autodocs for complete class members, this may not always
|
|
be up-to date.
|
|
|
|
* Base properties defined/available on all Objects
|
|
|
|
key (string) - name of object
|
|
name (string)- same as key
|
|
dbref (int, read-only) - unique #id-number. Also "id" can be used.
|
|
date_created (string) - time stamp of object creation
|
|
|
|
account (Account) - controlling account (if any, only set together with
|
|
sessid below)
|
|
sessid (int, read-only) - session id (if any, only set together with
|
|
account above). Use `sessions` handler to get the
|
|
Sessions directly.
|
|
location (Object) - current location. Is None if this is a room
|
|
home (Object) - safety start-location
|
|
has_account (bool, read-only)- will only return *connected* accounts
|
|
contents (list, read only) - returns all objects inside this object
|
|
exits (list of Objects, read-only) - returns all exits from this
|
|
object, if any
|
|
destination (Object) - only set if this object is an exit.
|
|
is_superuser (bool, read-only) - True/False if this user is a superuser
|
|
is_connected (bool, read-only) - True if this object is associated with
|
|
an Account with any connected sessions.
|
|
has_account (bool, read-only) - True is this object has an associated account.
|
|
is_superuser (bool, read-only): True if this object has an account and that
|
|
account is a superuser.
|
|
|
|
* Handlers available
|
|
|
|
aliases - alias-handler: use aliases.add/remove/get() to use.
|
|
permissions - permission-handler: use permissions.add/remove() to
|
|
add/remove new perms.
|
|
locks - lock-handler: use locks.add() to add new lock strings
|
|
scripts - script-handler. Add new scripts to object with scripts.add()
|
|
cmdset - cmdset-handler. Use cmdset.add() to add new cmdsets to object
|
|
nicks - nick-handler. New nicks with nicks.add().
|
|
sessions - sessions-handler. Get Sessions connected to this
|
|
object with sessions.get()
|
|
attributes - attribute-handler. Use attributes.add/remove/get.
|
|
db - attribute-handler: Shortcut for attribute-handler. Store/retrieve
|
|
database attributes using self.db.myattr=val, val=self.db.myattr
|
|
ndb - non-persistent attribute handler: same as db but does not create
|
|
a database entry when storing data
|
|
|
|
* Helper methods (see src.objects.objects.py for full headers)
|
|
|
|
get_search_query_replacement(searchdata, **kwargs)
|
|
get_search_direct_match(searchdata, **kwargs)
|
|
get_search_candidates(searchdata, **kwargs)
|
|
get_search_result(searchdata, attribute_name=None, typeclass=None,
|
|
candidates=None, exact=False, use_dbref=None, tags=None, **kwargs)
|
|
get_stacked_result(results, **kwargs)
|
|
handle_search_results(searchdata, results, **kwargs)
|
|
search(searchdata, global_search=False, use_nicks=True, typeclass=None,
|
|
location=None, attribute_name=None, quiet=False, exact=False,
|
|
candidates=None, use_locks=True, nofound_string=None,
|
|
multimatch_string=None, use_dbref=None, tags=None, stacked=0)
|
|
search_account(searchdata, quiet=False)
|
|
execute_cmd(raw_string, session=None, **kwargs))
|
|
msg(text=None, from_obj=None, session=None, options=None, **kwargs)
|
|
for_contents(func, exclude=None, **kwargs)
|
|
msg_contents(message, exclude=None, from_obj=None, mapping=None,
|
|
raise_funcparse_errors=False, **kwargs)
|
|
move_to(destination, quiet=False, emit_to_obj=None, use_destination=True)
|
|
clear_contents()
|
|
create(key, account, caller, method, **kwargs)
|
|
copy(new_key=None)
|
|
at_object_post_copy(new_obj, **kwargs)
|
|
delete()
|
|
is_typeclass(typeclass, exact=False)
|
|
swap_typeclass(new_typeclass, clean_attributes=False, no_default=True)
|
|
access(accessing_obj, access_type='read', default=False,
|
|
no_superuser_bypass=False, **kwargs)
|
|
filter_visible(obj_list, looker, **kwargs)
|
|
get_default_lockstring()
|
|
get_cmdsets(caller, current, **kwargs)
|
|
check_permstring(permstring)
|
|
get_cmdset_providers()
|
|
get_display_name(looker=None, **kwargs)
|
|
get_extra_display_name_info(looker=None, **kwargs)
|
|
get_numbered_name(count, looker, **kwargs)
|
|
get_display_header(looker, **kwargs)
|
|
get_display_desc(looker, **kwargs)
|
|
get_display_exits(looker, **kwargs)
|
|
get_display_characters(looker, **kwargs)
|
|
get_display_things(looker, **kwargs)
|
|
get_display_footer(looker, **kwargs)
|
|
format_appearance(appearance, looker, **kwargs)
|
|
return_apperance(looker, **kwargs)
|
|
|
|
* Hooks (these are class methods, so args should start with self):
|
|
|
|
basetype_setup() - only called once, used for behind-the-scenes
|
|
setup. Normally not modified.
|
|
basetype_posthook_setup() - customization in basetype, after the object
|
|
has been created; Normally not modified.
|
|
|
|
at_object_creation() - only called once, when object is first created.
|
|
Object customizations go here.
|
|
at_object_delete() - called just before deleting an object. If returning
|
|
False, deletion is aborted. Note that all objects
|
|
inside a deleted object are automatically moved
|
|
to their <home>, they don't need to be removed here.
|
|
|
|
at_init() - called whenever typeclass is cached from memory,
|
|
at least once every server restart/reload
|
|
at_first_save()
|
|
at_cmdset_get(**kwargs) - this is called just before the command handler
|
|
requests a cmdset from this object. The kwargs are
|
|
not normally used unless the cmdset is created
|
|
dynamically (see e.g. Exits).
|
|
at_pre_puppet(account)- (account-controlled objects only) called just
|
|
before puppeting
|
|
at_post_puppet() - (account-controlled objects only) called just
|
|
after completing connection account<->object
|
|
at_pre_unpuppet() - (account-controlled objects only) called just
|
|
before un-puppeting
|
|
at_post_unpuppet(account) - (account-controlled objects only) called just
|
|
after disconnecting account<->object link
|
|
at_server_reload() - called before server is reloaded
|
|
at_server_shutdown() - called just before server is fully shut down
|
|
|
|
at_access(result, accessing_obj, access_type) - called with the result
|
|
of a lock access check on this object. Return value
|
|
does not affect check result.
|
|
|
|
at_pre_move(destination) - called just before moving object
|
|
to the destination. If returns False, move is cancelled.
|
|
announce_move_from(destination) - called in old location, just
|
|
before move, if obj.move_to() has quiet=False
|
|
announce_move_to(source_location) - called in new location, just
|
|
after move, if obj.move_to() has quiet=False
|
|
at_post_move(source_location) - always called after a move has
|
|
been successfully performed.
|
|
at_pre_object_leave(leaving_object, destination, **kwargs)
|
|
at_object_leave(obj, target_location, move_type="move", **kwargs)
|
|
at_object_leave(obj, target_location) - called when an object leaves
|
|
this object in any fashion
|
|
at_pre_object_receive(obj, source_location)
|
|
at_object_receive(obj, source_location, move_type="move", **kwargs) -
|
|
called when this object receives another object
|
|
at_post_move(source_location, move_type="move", **kwargs)
|
|
|
|
at_traverse(traversing_object, target_location, **kwargs) - (exit-objects only)
|
|
handles all moving across the exit, including
|
|
calling the other exit hooks. Use super() to retain
|
|
the default functionality.
|
|
at_post_traverse(traversing_object, source_location) - (exit-objects only)
|
|
called just after a traversal has happened.
|
|
at_failed_traverse(traversing_object) - (exit-objects only) called if
|
|
traversal fails and property err_traverse is not defined.
|
|
|
|
at_msg_receive(self, msg, from_obj=None, **kwargs) - called when a message
|
|
(via self.msg()) is sent to this obj.
|
|
If returns false, aborts send.
|
|
at_msg_send(self, msg, to_obj=None, **kwargs) - called when this objects
|
|
sends a message to someone via self.msg().
|
|
|
|
return_appearance(looker) - describes this object. Used by "look"
|
|
command by default
|
|
at_desc(looker=None) - called by 'look' whenever the
|
|
appearance is requested.
|
|
at_pre_get(getter, **kwargs)
|
|
at_get(getter) - called after object has been picked up.
|
|
Does not stop pickup.
|
|
at_pre_give(giver, getter, **kwargs)
|
|
at_give(giver, getter, **kwargs)
|
|
at_pre_drop(dropper, **kwargs)
|
|
at_drop(dropper, **kwargs) - called when this object has been dropped.
|
|
at_pre_say(speaker, message, **kwargs)
|
|
at_say(message, msg_self=None, msg_location=None, receivers=None, msg_receivers=None, **kwargs)
|
|
|
|
at_look(target, **kwargs)
|
|
at_desc(looker=None)
|
|
|
|
"""
|
|
def at_post_move(self, source_location, move_type="move", **kwargs):
|
|
"""
|
|
Delete ourselves if we are living inside a pet.
|
|
"""
|
|
super().at_post_move(source_location, move_type)
|
|
if self.location.is_typeclass("typeclasses.pets.Pet"):
|
|
delay(5, self.delete)
|
|
|
|
def at_give(self, giver, getter, **kwargs):
|
|
"""
|
|
Call a puppet's 'other_given' method, if defined.
|
|
"""
|
|
super().at_give(giver, getter)
|
|
if hasattr(getter, 'other_given') and callable(getter.other_given):
|
|
getter.other_given(giver, self)
|
|
|
|
# The RPSystem overrides the `get_search_result` function to be
|
|
# able to look for words in the `sdesc` field, but in the process,
|
|
# dropped the ability to search for objects by `alias`.
|
|
|
|
def get_search_result(
|
|
self,
|
|
searchdata,
|
|
candidates=None,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Override of the parent method for producing search results
|
|
that understands sdescs. These are used in the main .search()
|
|
method of the parent class.
|
|
"""
|
|
# we also want to use the default search method
|
|
search_obj = super().get_search_result
|
|
results = []
|
|
|
|
if candidates is not None:
|
|
searched_results = parse_sdescs_and_recogs(
|
|
self, candidates, "/" + searchdata, search_mode=True
|
|
)
|
|
if not searched_results:
|
|
results = search_obj(searchdata, candidates=candidates, **kwargs)
|
|
else:
|
|
# We do a default search on each result by key, here,
|
|
# to apply extra filtering kwargs
|
|
for searched_obj in searched_results:
|
|
results.extend([
|
|
obj
|
|
for obj in search_obj(searched_obj.key, candidates=[searched_obj], **kwargs)
|
|
if obj not in results])
|
|
else:
|
|
# no candidates means it's a global search, so we pass it back to the default
|
|
results = search_obj(searchdata, **kwargs)
|
|
return results
|
|
|
|
|
|
class Listener:
|
|
"""
|
|
Mix in class with methods for being able to respond
|
|
to events from characters.
|
|
"""
|
|
def get_character_label(self, character):
|
|
cleaned = sub(r"|[A-z/]", "",
|
|
character.get_display_name(self))
|
|
return cleaned.split(' ')[-1]
|
|
|
|
def trigger_sequence(self, seq, character, *other_stuff):
|
|
"""
|
|
The actions associated with the trigger
|
|
|
|
{0} - character's real name
|
|
{1} - character's sdesc (from puppet's pov)
|
|
{2} - character's label, last word in sdesc
|
|
{3} - character object
|
|
"""
|
|
logger.info(f"Triggering {seq}")
|
|
self.delay_sequence(seq, 1, character.key,
|
|
character.get_display_name(self),
|
|
self.get_character_label(character),
|
|
character, *other_stuff)
|
|
def get_attribute_trigger(self, label, character):
|
|
"""
|
|
Return the attribute where 'key' is the label, and the
|
|
category can by either the 'character' or its display name.
|
|
"""
|
|
name = self.get_character_label(character)
|
|
return (self.attributes.get(key=label, category=character.key) or
|
|
self.attributes.get(key=label, category=name) or
|
|
self.attributes.get(key=label))
|
|
|
|
def do_trigger(self, action, character):
|
|
"""
|
|
Return a list of triggers matching 'action' and 'character'.
|
|
"""
|
|
seq = self.get_attribute_trigger(action, character)
|
|
if seq:
|
|
self.trigger_sequence(seq, character)
|
|
|
|
def other_arrive(self, character):
|
|
"""
|
|
Execute a command when a character arrives in the same location.
|
|
"""
|
|
self.do_trigger('arrive', character)
|
|
|
|
def other_leave(self, character):
|
|
"""
|
|
Execute a command when a character arrives in the same location.
|
|
"""
|
|
self.do_trigger('leave', character)
|
|
|
|
def other_sit(self, character):
|
|
"""
|
|
Execute a command when a character sits on something in a location.
|
|
"""
|
|
self.do_trigger('sit', character)
|
|
|
|
def other_stand(self, character):
|
|
"""
|
|
Execute a command when a character stands in a location.
|
|
"""
|
|
self.do_trigger('stand', character)
|
|
|
|
def say_triggers(self, label, character, speech):
|
|
trigs = self.get_attribute_trigger(label, character)
|
|
if trigs:
|
|
try:
|
|
for _, trigger in enumerate(dict(trigs)):
|
|
seq = trigs[trigger]
|
|
if match(trigger, speech, IGNORECASE):
|
|
self.trigger_sequence(seq, character)
|
|
except:
|
|
self.trigger_sequence(trigs, character)
|
|
|
|
def other_say(self, speaker, speech):
|
|
self.say_triggers('say', speaker, speech)
|
|
|
|
def other_sayto(self, speaker, speech):
|
|
self.say_triggers('sayto', speaker, speech)
|
|
|
|
def other_given(self, giver, obj):
|
|
target = giver.get_display_name(self).split(' ')[-1]
|
|
self.execute_cmd(f"emote /Me says to /{target}, \"Thanks for the {obj.name}.\"")
|
|
|
|
def do_cmd(self, cmd):
|
|
"""
|
|
Like 'execute_cmd', but for objects.
|
|
"""
|
|
logger.info(f"Executing: {cmd}")
|
|
m = match(r"teleport +(.*?) *= *(.*)", cmd)
|
|
if m:
|
|
o = search_object(m.group(1)).first()
|
|
d = search_object(m.group(2)).first()
|
|
if o and d:
|
|
logger.info(f"Teleporting: {m.group(1)} {o} to {d}")
|
|
o.move_to(d, quiet=True)
|
|
return
|
|
|
|
# Private message only a single character can hear:
|
|
m = match(r"gmm +(.*?) *= *(.*)", cmd)
|
|
if m:
|
|
obj = search_object(m.group(1)).first()
|
|
msg = m.group(2)
|
|
if obj and msg:
|
|
obj.msg(msg)
|
|
return
|
|
|
|
# Character announce_action message:
|
|
m = match(r"gma +(.*?) *= *(.*)", cmd)
|
|
if m:
|
|
obj = search_object(m.group(1)).first()
|
|
msg = m.group(2)
|
|
if obj and msg:
|
|
obj.announce_action(msg)
|
|
return
|
|
|
|
m = match(r"tag_all +(.*) *", cmd)
|
|
if m:
|
|
tag = m.group(1)
|
|
logger.info(f"Tagging '{tag}'")
|
|
for c in self.characters_here(puppets=True):
|
|
c.tags.add(tag)
|
|
return
|
|
|
|
m = match(r"untag_all +(.*) *", cmd)
|
|
if m:
|
|
tag = m.group(1)
|
|
logger.info(f"Tagging '{tag}'")
|
|
for c in self.characters_here(puppets=True):
|
|
c.tags.remove(tag)
|
|
return
|
|
|
|
m = match(r"gift_all ([A-z]+)( *: *([^:]+)( *: *(.*))?)?", cmd)
|
|
if m:
|
|
for c in self.characters_here(puppets=True):
|
|
logger.info(f"Highest Gift: {m.group(1)} to {c.key}")
|
|
self.do_gift(c.key, m.group(1), m.group(3), m.group(5))
|
|
return
|
|
|
|
m = match(r"gift ([A-z]+) *?( to|=)? *([^:]+)( *: *([^:]+)( *: *(.*))?)?", cmd)
|
|
if m:
|
|
logger.info(f"Higher Gift: {m.group(1)} to {m.group(3)}")
|
|
self.do_gift(m.group(3), m.group(1), m.group(5), m.group(7))
|
|
return
|
|
|
|
m = match(r"coin_all ([0-9]+)", cmd)
|
|
if m:
|
|
for c in self.characters_here(puppets=True):
|
|
coins = int(m.group(1))
|
|
logger.info(f"Giving {c.key} {coins} coins.")
|
|
c.adjust_coins(coins)
|
|
return
|
|
|
|
m = match(r"coin ([0-9]+) *?( to|=)? *([^:]+)", cmd)
|
|
if m:
|
|
c = self.search(m.group(3))
|
|
if c:
|
|
c.adjust_coins(int(m.group(1)))
|
|
return
|
|
|
|
m = match(r"scoring_all ([a-z_]+)", cmd)
|
|
if m:
|
|
tick = Scores[m.group(1)]
|
|
for c in self.characters_here(puppets=True):
|
|
c.score(tick)
|
|
return
|
|
|
|
m = match(r"scoring ([a-z_]+) *?( to|=)? *([a-z_]+)", cmd)
|
|
if m:
|
|
tick = Scores[m.group(1)]
|
|
c = self.search(m.group(3))
|
|
if c:
|
|
c.score(tick)
|
|
return
|
|
|
|
if self.is_typeclass("typeclasses.characters.Character"):
|
|
self.execute_cmd(cmd)
|
|
else:
|
|
logger.info(f"Can't do {cmd}, yet.")
|
|
|
|
def cleanup(self):
|
|
"""
|
|
Execute orders.
|
|
|
|
@set thing/objects_here = other_object
|
|
@set thing/objects_here = [one_object, two_object]
|
|
"""
|
|
def move_to_me(obj):
|
|
o = search_object(obj).first()
|
|
if o:
|
|
o.move_to(self, quiet=True)
|
|
|
|
objs = self.db.objects_here
|
|
if objs:
|
|
if isinstance(objs, str):
|
|
move_to_me(objs)
|
|
else:
|
|
for obj in objs:
|
|
move_to_me(obj)
|
|
|
|
def do_gift(self, recipient, gift, name=None, desc=None):
|
|
"""
|
|
Give a 'gift' by name to a 'recipient'.
|
|
|
|
This doesn't give the gift if the recipient has already
|
|
received the gift.
|
|
"""
|
|
gifts = {
|
|
'purse': {"typeclass": "typeclasses.things.CoinPurse",
|
|
"key": name or gift,
|
|
"desc": desc or "Small leather coin purse, with a gold clasp.",
|
|
},
|
|
'ball': {"typeclass": "typeclasses.things.CrystalBall",
|
|
"key": name or "crystal ball",
|
|
"desc": desc or "Swirling glass ball of colored ectoplasm.",
|
|
},
|
|
'dice': {"typeclass": "typeclasses.things.Dice",
|
|
"key": name or "pair of dice",
|
|
"desc": desc or "Two bone knuckles with painted dots.",
|
|
"attr": {"number": 2}
|
|
},
|
|
'pipe': {"typeclass": "typeclasses.things.Pipe",
|
|
"key": name or gift,
|
|
"desc": desc or "Smoking pipe carved with esoteric symbols.",
|
|
},
|
|
'wand': {"typeclass": "typeclasses.things.Wand",
|
|
"key": name or gift,
|
|
"desc": desc or "Curiously crafted wand carved with runes: ᛑ ᛒ ᚱ",
|
|
},
|
|
'bag': {"typeclass": "typeclasses.things.BagofJunk",
|
|
"key": name or "sack",
|
|
"desc": desc or "Small cloth bag with a leather drawstring.|/You could probably |grummage|n around in that, and maybe |gkeep|n something.",
|
|
},
|
|
'junk': {"typeclass": "typeclasses.things.BagofJunk",
|
|
"key": name or "sack",
|
|
"desc": desc or "Small cloth bag with a leather drawstring.|/You could probably |grummage|n around in that, and maybe |gkeep|n something.",
|
|
"attr": {
|
|
"stuff": "human"
|
|
},
|
|
},
|
|
'blue': {"typeclass": "typeclasses.things.Medal",
|
|
"key": name or "blue medal",
|
|
"desc": desc or "Gold medallion decorated with a trident and conch suspended by a blue ribbon.",
|
|
},
|
|
}
|
|
|
|
receiver = self.search(recipient, global_search=True)
|
|
if not receiver:
|
|
logger.info(f"Didn't find {recipient}.")
|
|
return None
|
|
|
|
if gift in gifts.keys() and \
|
|
not receiver.attributes.get(f"received_{gift}"):
|
|
details = gifts[gift]
|
|
logger.info(f"Giving {details['key']} to {receiver.key}")
|
|
|
|
obj = spawn(details)[0]
|
|
for attr, value in enumerate(details.get("attr", {})):
|
|
obj.attributes.add(attr, value)
|
|
obj.location = receiver
|
|
receiver.attributes.add(f"received_{gift}", True)
|
|
|
|
their_name = receiver.get_display_name(self)
|
|
self.announce_action(f"$You() $conj(give) something to {their_name}.",
|
|
exclude=receiver)
|
|
|
|
my_name = self.get_display_name(receiver)
|
|
receiver.msg(f"{my_name} gives you a {obj.name}.")
|
|
return True
|
|
|
|
self.msg(f"You can't give '{gift}' to {receiver.key}... {gifts.keys()}")
|
|
return None
|
|
|
|
class AI:
|
|
"""
|
|
Mixin providing AI-powered (Claude/Anthropic) reasoning capabilities.
|
|
Classes using this mixin can think, process thoughts, and maintain
|
|
per-speaker conversation history backed by JSON files.
|
|
"""
|
|
|
|
short_history = []
|
|
|
|
def pop_recent_events(self, speech):
|
|
if self.short_history:
|
|
if match(rf".*{speech}.*", self.short_history[-1]):
|
|
self.short_history.pop()
|
|
history = '\n\n'.join(self.short_history)
|
|
else:
|
|
history = ""
|
|
|
|
# logger.info(f"{history}")
|
|
self.short_history = []
|
|
return history
|
|
|
|
def setting_and_backstory(self, speaker=None):
|
|
logger.info(f"Reading {self.db.personality_file}")
|
|
system_prompt = Path(self.db.personality_file).read_text()
|
|
system_prompt += "\n\n"
|
|
system_prompt += "You are currently in " + self.location.key + ". "
|
|
if self.location.key == "Cozy House":
|
|
system_prompt += "This is the dwelling of the gnome, Dabbler."
|
|
if self.location.key == "Homey Hut":
|
|
system_prompt += "This is the dwelling of the witch, Trampoli."
|
|
system_prompt += "Described as " + self.location.desc
|
|
if speaker:
|
|
system_prompt += "\n\n"
|
|
system_prompt += "You are talking to a "
|
|
system_prompt += speaker.db.gender + " " + speaker.sdesc.get() + ". "
|
|
system_prompt += "Described as " + speaker.db.desc
|
|
return system_prompt
|
|
|
|
def history_file(self, speaker):
|
|
name = f"{speaker}".replace(" ", "-")
|
|
combo_name = f".{self.db.personality}-{name}.json".lower()
|
|
filename = path.join(personality_dir, combo_name)
|
|
logger.info(f"Chatbot history_file: {filename}")
|
|
return Path(filename)
|
|
|
|
def history(self, speaker):
|
|
history_file = self.history_file(speaker)
|
|
return json.loads(history_file.read_text()) if history_file.exists() else []
|
|
|
|
def update_history(self, speaker, messages, reply):
|
|
history_file = self.history_file(speaker)
|
|
messages.append({"role": "assistant", "content": reply})
|
|
history_file.write_text(json.dumps(messages, indent=2))
|
|
|
|
def _think(self, system_prompt, messages):
|
|
logger.info("Calling out to Anthropic...")
|
|
client = anthropic.Anthropic()
|
|
response = client.messages.create(
|
|
model="claude-haiku-4-5",
|
|
max_tokens=240,
|
|
system=system_prompt,
|
|
messages=messages,
|
|
)
|
|
return response.content[0].text
|
|
|
|
def think(self, speaker, speech):
|
|
"""
|
|
Ask Claude to think of a reply to speech from speaker.
|
|
Uses the 'system_prompt' from a personality file,
|
|
and 'messages' from the JSON history function,
|
|
appended with all 'events' recorded since last time.
|
|
"""
|
|
system_prompt = self.setting_and_backstory(speaker)
|
|
messages = self.history(speaker)
|
|
recent_events = self.pop_recent_events(speech)
|
|
if recent_events:
|
|
speech = f"{recent_events}\n\n{speaker.key}: {speech}"
|
|
messages.append({"role": "user", "content": speech})
|
|
|
|
# logger.info(f"Deep Thoughts: {system_prompt} / {messages}")
|
|
reply = self._think(system_prompt, messages)
|
|
|
|
self.update_history(speaker, messages, reply)
|
|
return reply
|
|
|
|
def process_thoughts(self, response):
|
|
paragraphs = response.split('\n\n')
|
|
|
|
for idx, paragraph in enumerate(paragraphs):
|
|
m = match(r"^ *\| *(.*)", paragraph)
|
|
if m:
|
|
action = m.group(1)
|
|
logger.info(f"Doing: '{action}'")
|
|
delay(6 * idx, self.execute_cmd, action)
|
|
else:
|
|
logger.info(f"Saying: '{paragraph}'")
|
|
delay(6 * idx,
|
|
self.location.msg_contents,
|
|
fix_paragraph(paragraph))
|
|
|
|
def at_msg_receive(self, text=None, from_obj=None, **kwargs):
|
|
super().at_msg_receive(text, from_obj=from_obj, **kwargs)
|
|
logger.info(f"at_msg_receive: {text} :: {self.key}")
|
|
|
|
if from_obj != self:
|
|
msg = text if isinstance(text, str) else text[0]
|
|
msg = sub(r'\|[a-zA-Z]', '', msg) if msg else msg
|
|
|
|
if from_obj:
|
|
if hasattr(from_obj, 'sdesc') and from_obj.sdesc.get():
|
|
name = from_obj.sdesc.get()
|
|
else:
|
|
name = from_obj.key
|
|
self.short_history.append(f"{name}: {msg}")
|
|
else:
|
|
self.short_history.append(msg)
|
|
|
|
self.short_history = self.short_history[-10:]
|
|
return True
|
|
|
|
|
|
class Recorder(Object):
|
|
"""Mixin for recording the events in the current room.
|
|
|
|
Classes that include this mixin, need, in their msg()
|
|
should call 'record_msg', for instance:
|
|
|
|
def msg(self, text=None, from_obj=None, session=None, **kwargs):
|
|
self.record_msg(text, from_obj)
|
|
|
|
Objects should set the following properties:
|
|
|
|
@set scribe/transcripts_path = "transcripts/%Y-%m-%d.html"
|
|
@set scribe/header_file = "transcripts/header-template.html"
|
|
|
|
The 'directory' is really just a prefix to a file that includes
|
|
today's date.
|
|
"""
|
|
def capitalize_msg(self, text, msg_type=None):
|
|
"""
|
|
If text is lowercase, capitalize it.
|
|
|
|
Maybe prepend a 'The' to the front.
|
|
"""
|
|
if msg_type and msg_type in ('traverse', 'teleport'):
|
|
if match(r"^(\|[A-z])?[aeiou]", text):
|
|
return "An " + text
|
|
else:
|
|
return "A " + text
|
|
|
|
# If the line is colored and lowercase, it probably assumes a
|
|
# character's sdesc, so let's add a 'the' to the front:
|
|
# Do we need to only do this on 'say'?
|
|
elif match(r"^\|[mb][a-z]", text):
|
|
return "The " + text
|
|
elif match(r"^(\|[A-z])[a-z]", text) and len(text) > 2:
|
|
return text[0:2] + text[2].upper() + text[3:]
|
|
elif match(r"^[a-z]", text) and len(text) > 2:
|
|
return text[0].upper() + text[1:]
|
|
return text
|
|
|
|
def to_html(self, text, msg_type=None):
|
|
"""
|
|
Convert the 'text' to an HTML formatted string.
|
|
"""
|
|
# Yellow text should be italics:
|
|
text = sub(r"\|y(.*?)\|n", '<i>\\1</i>', text)
|
|
# Bold and tag the titles:
|
|
text = sub(r"\|[cb](.*?)\|n", '<b class="title">\\1</b>', text)
|
|
# Bold anything white:
|
|
text = sub(r"\|w(.*?)\|n", '<b class="heavy">\\1</b>', text)
|
|
# Remove the rest:
|
|
text = sub(r"\|[A-z]", '', text)
|
|
# Remove initial or final carriage returns:
|
|
text = sub(r"^\n", '', text)
|
|
text = sub(r"\n$", '', text)
|
|
# Convert the carriage returns:
|
|
text = sub(r"\n", ' <br/>\n', text)
|
|
|
|
if msg_type and msg_type == 'look':
|
|
text = text[0].upper() + text[1:]
|
|
return f"<div class=\"special\"><p>{text}</p></div>\n"
|
|
if msg_type and msg_type == 'note':
|
|
return f"<blockquote>{text}</blockquote>"
|
|
return f"<p>{text}</p>\n"
|
|
|
|
|
|
def find_actor(self, text, default=None):
|
|
"""
|
|
Extract the character performing the action from the text.
|
|
"""
|
|
if default:
|
|
return default
|
|
|
|
m = match(r".*\|b(.*?)\|n.*", text)
|
|
if m:
|
|
return m.group(1)
|
|
|
|
def record_msg(self, text, actor=None):
|
|
msg_type = None
|
|
|
|
if isinstance(text, tuple):
|
|
if text[1] and isinstance(text[1], dict):
|
|
msg_type = text[1]['type']
|
|
msg = self.capitalize_msg(text[0], msg_type)
|
|
else:
|
|
msg = self.capitalize_msg(str(text))
|
|
|
|
# Make things a little more interesting by substituting the
|
|
# name...
|
|
names = actor and actor.db.alt_names
|
|
if names:
|
|
msg = sub(f"^{actor.sdesc.get()}", choice(names),
|
|
msg, flags=IGNORECASE)
|
|
|
|
actor = self.find_actor(msg, actor)
|
|
|
|
msg = self.to_html(msg, msg_type)
|
|
# @set shrub/transcripts_path = "transcripts/bar-%Y-%m-%d.html"
|
|
filename = datetime.today().strftime(self.db.transcripts_path)
|
|
|
|
# @set shrub/header_file = "transcripts/header-template.html"
|
|
if not exists(filename):
|
|
makedirs(dirname(filename), exist_ok=True)
|
|
copyfile(self.db.header_file, filename)
|
|
|
|
with open(filename, "a") as myfile:
|
|
myfile.write(msg)
|
|
|
|
# Follow-up Actions ...
|
|
if match(r".* arrives .*", msg) and actor and \
|
|
msg_type in ('traverse', 'teleport'):
|
|
self.execute_cmd(f"look {actor}")
|