Skip to content

Commit

Permalink
queuing assets to redis; banking from queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
ndenny committed May 19, 2025
1 parent f059cce commit e34dcad
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 4 deletions.
113 changes: 111 additions & 2 deletions bin/bastion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from ruamel.yaml import YAML
from ruamel.yaml.scalarstring import PreservedScalarString

from redis import Redis

logger = logging.getLogger()
logging.basicConfig(level = logging.DEBUG)

Expand All @@ -22,7 +24,7 @@
sys.path.insert(0, str(LIB_PATH))


from Bastion.Common import Boggle, asPath, toJDN
from Bastion.Common import Boggle, asPath, toJDN, ALWAYS, RDN
from Bastion.Chronology import Quantim
from Bastion.Site import Site
from Bastion.Condo import Condex
Expand All @@ -44,6 +46,24 @@
b. anchor (full) if drift >= policy
"""

class ProtectionRequest(Request):
"""
I am the abstract base class for all requests to backup assets into a vault.
"""
@property
def ARK(self):
"""
I am the ARK of the asset being backed up.
"""
if len(self.args) == 1:
return ARK(self.args[0])
elif len(self.args) == 2:
return ARK(self.args[0], self.args[1])
elif len(self.args) == 3:
return ARK(self.args[0], self.args[1], self.args[2])
else:
raise ValueError("ARK must be given as a single argument or as two arguments (site, zone)")

class App:
CONF_SEARCH_ORDER = [
pathlib.Path('/etc/bastion'),
Expand Down Expand Up @@ -87,6 +107,17 @@ def configured(self):
self.conf.load(folder / confile)
return self

@property
def redis(self):
"""
Lazy load the redis connection.
"""
if not hasattr(self, '_redis'):
self._redis = Redis(host = self.conf.get("bastion.redis.host", "localhost"),
port = self.conf.get("bastion.redis.port", 6379),
db = self.conf.get("bastion.redis.db", 0))
return self._redis

@property
def hostname(self):
if 'host.name' in self.conf:
Expand Down Expand Up @@ -195,7 +226,10 @@ def run(self):
"list vaults declared", self.do_export_vaults_declared),

("refresh keytab", self.do_refresh_keytab),
("enroll assets", self.do_enroll_assets)
("enroll assets", self.do_enroll_assets),

("queue assets", self.do_queue_asset_updates),
("bank queued assets", self.do_bank_queued_assets)
]

#-- Look for an explicitly declared session ID in opts;
Expand Down Expand Up @@ -867,6 +901,75 @@ def do_export_update_plan(self, request):
tasks = self._generate_update_plan(request)
raise NotImplementedError

def do_queue_asset_updates(self, request):
"""
I push asset ARKs for all assets in the given site and zone.
These ARKs will be pushed to my redis instance into the queue named "bastion.{site}.{zone}.updates"
"""
if len(request.args) == 1:
#-- We were given a single ARK argument.
ark = ARK(request.args[0])
elif len(request.args) == 2:
#-- We were given two arguments (site, zone)
ark = ARK(request.args[0], request.args[1])
else:
raise ValueError("do_queue_asset_updates expects zone to be given as a CURIE'd ARK")

site = self.site(ark.site)
zone = site.zone(ark)
qname = "bastion.{}.{}.updates".format(RDN(ark.site), RDN(ark.zone))
queued = [ ]

#-- Collect all assets in the zone.
#-- I will push the ARK of each asset to the redis queue.
#-- The redis queue is named "bastion.{site}.{zone}.updates"
#-- I will use the redis queue to perform the updates in the background.
for asset in zone.assets:
#-- push the ARK to the redis queue.
#-- I will use the redis queue to perform the updates in the background.
self.redis.lpush(qname, str(asset.ARK))
queued.append(str(asset.ARK))

#-- return the list of ARKs that were pushed to the redis queue.
return request.succeeded(queued, report = "pushed {} ARKs to the redis queue {}".format(len(queued), qname))

def do_bank_queued_assets(self, request):
"""
I perform all asset updates that are queued in the redis queue "bastion.{site}.{zone}.updates"
"""
if len(request.args) == 1:
#-- We were given a single ARK argument.
ark = ARK(request.args[0])
elif len(request.args) == 2:
#-- We were given two arguments (site, zone)
ark = ARK(request.args[0], request.args[1])
else:
raise ValueError("do_perform_queued_asset_updates expects zone to be given as a CURIE'd ARK")

site = self.site(ark.site)
zone = site.zone(ark)
qname = "bastion.{}.{}.updates".format(RDN(ark.site), RDN(ark.zone))
updates = [ ]

#-- pop an ARK from the redis queue.
#-- The redis queue is named "bastion.{site}.{zone}.updates"
while ALWAYS:
slug = self.redis.lpop(qname)
if slug is None:
break
else:
logger.info("performing update for {}".format(slug))
ark = ARK(slug.decode('utf-8'))
command = "{}/bastion.py bank asset {}".format(str(BIN_PATH), str(ark))
#os.system(command)
print(command)
updates.append(str(ark))

if updates:
return request.succeeded(updates, report = "performed {} updates ({} expected)".format(len(updates), len(zone.assets)))
else:
return request.succeeded(updates, report = "no updates to perform")


if __name__ == '__main__':
app = App().configured()
Expand Down Expand Up @@ -927,5 +1030,11 @@ def do_export_update_plan(self, request):
#bastion export update plan {site} {zone}
#bastion export update plan {ark}

#-- These are intermediate operations that will be depecrated when
#-- the self performative system is fully implemented.
#bastion queue asset updates {site} {zone}
#bastion queue asset update {ark}
#bastion perform queued asset updates

#bastion perform request filed
#bastion perform request queued
7 changes: 6 additions & 1 deletion lib/Bastion/Common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
HOUR = datetime.timedelta(hours = 1)
HOURS = HOUR


ALWAYS = True
NEVER = False
YES = True
NO = False
ON = True
OFF = False

class Thing:
def __init__(self, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion lib/Bastion/Model.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __new__(cls, *args):
site, zone, asset = args
s = RDN(site)
z = RDN(zone)
a = asset.name if isinstance(isAsset) else str(asset)
a = asset.name if isinstance(asset, isAsset) else str(asset)
st = s if (s[0] == '@') else "@{}".format(s)
return tuple.__new__(cls, [st, z, a])

Expand Down

0 comments on commit e34dcad

Please sign in to comment.