From e34dcadf08549614a01dc6e9d2775894b6cb1c42 Mon Sep 17 00:00:00 2001 From: Nathan Denny Date: Mon, 19 May 2025 14:22:21 -0400 Subject: [PATCH] queuing assets to redis; banking from queue. --- bin/bastion.py | 113 +++++++++++++++++++++++++++++++++++++++++- lib/Bastion/Common.py | 7 ++- lib/Bastion/Model.py | 2 +- 3 files changed, 118 insertions(+), 4 deletions(-) diff --git a/bin/bastion.py b/bin/bastion.py index a64283a..305c575 100755 --- a/bin/bastion.py +++ b/bin/bastion.py @@ -10,6 +10,8 @@ from ruamel.yaml import YAML from ruamel.yaml.scalarstring import PreservedScalarString +from redis import Redis + logger = logging.getLogger() logging.basicConfig(level = logging.DEBUG) @@ -22,7 +24,7 @@ sys.path.insert(0, str(LIB_PATH)) -from Bastion.Common import Boggle, asPath, toJDN +from Bastion.Common import Boggle, asPath, toJDN, ALWAYS, RDN from Bastion.Chronology import Quantim from Bastion.Site import Site from Bastion.Condo import Condex @@ -44,6 +46,24 @@ b. anchor (full) if drift >= policy """ +class ProtectionRequest(Request): + """ + I am the abstract base class for all requests to backup assets into a vault. + """ + @property + def ARK(self): + """ + I am the ARK of the asset being backed up. + """ + if len(self.args) == 1: + return ARK(self.args[0]) + elif len(self.args) == 2: + return ARK(self.args[0], self.args[1]) + elif len(self.args) == 3: + return ARK(self.args[0], self.args[1], self.args[2]) + else: + raise ValueError("ARK must be given as a single argument or as two arguments (site, zone)") + class App: CONF_SEARCH_ORDER = [ pathlib.Path('/etc/bastion'), @@ -87,6 +107,17 @@ def configured(self): self.conf.load(folder / confile) return self + @property + def redis(self): + """ + Lazy load the redis connection. + """ + if not hasattr(self, '_redis'): + self._redis = Redis(host = self.conf.get("bastion.redis.host", "localhost"), + port = self.conf.get("bastion.redis.port", 6379), + db = self.conf.get("bastion.redis.db", 0)) + return self._redis + @property def hostname(self): if 'host.name' in self.conf: @@ -195,7 +226,10 @@ def run(self): "list vaults declared", self.do_export_vaults_declared), ("refresh keytab", self.do_refresh_keytab), - ("enroll assets", self.do_enroll_assets) + ("enroll assets", self.do_enroll_assets), + + ("queue assets", self.do_queue_asset_updates), + ("bank queued assets", self.do_bank_queued_assets) ] #-- Look for an explicitly declared session ID in opts; @@ -867,6 +901,75 @@ def do_export_update_plan(self, request): tasks = self._generate_update_plan(request) raise NotImplementedError + def do_queue_asset_updates(self, request): + """ + I push asset ARKs for all assets in the given site and zone. + These ARKs will be pushed to my redis instance into the queue named "bastion.{site}.{zone}.updates" + """ + if len(request.args) == 1: + #-- We were given a single ARK argument. + ark = ARK(request.args[0]) + elif len(request.args) == 2: + #-- We were given two arguments (site, zone) + ark = ARK(request.args[0], request.args[1]) + else: + raise ValueError("do_queue_asset_updates expects zone to be given as a CURIE'd ARK") + + site = self.site(ark.site) + zone = site.zone(ark) + qname = "bastion.{}.{}.updates".format(RDN(ark.site), RDN(ark.zone)) + queued = [ ] + + #-- Collect all assets in the zone. + #-- I will push the ARK of each asset to the redis queue. + #-- The redis queue is named "bastion.{site}.{zone}.updates" + #-- I will use the redis queue to perform the updates in the background. + for asset in zone.assets: + #-- push the ARK to the redis queue. + #-- I will use the redis queue to perform the updates in the background. + self.redis.lpush(qname, str(asset.ARK)) + queued.append(str(asset.ARK)) + + #-- return the list of ARKs that were pushed to the redis queue. + return request.succeeded(queued, report = "pushed {} ARKs to the redis queue {}".format(len(queued), qname)) + + def do_bank_queued_assets(self, request): + """ + I perform all asset updates that are queued in the redis queue "bastion.{site}.{zone}.updates" + """ + if len(request.args) == 1: + #-- We were given a single ARK argument. + ark = ARK(request.args[0]) + elif len(request.args) == 2: + #-- We were given two arguments (site, zone) + ark = ARK(request.args[0], request.args[1]) + else: + raise ValueError("do_perform_queued_asset_updates expects zone to be given as a CURIE'd ARK") + + site = self.site(ark.site) + zone = site.zone(ark) + qname = "bastion.{}.{}.updates".format(RDN(ark.site), RDN(ark.zone)) + updates = [ ] + + #-- pop an ARK from the redis queue. + #-- The redis queue is named "bastion.{site}.{zone}.updates" + while ALWAYS: + slug = self.redis.lpop(qname) + if slug is None: + break + else: + logger.info("performing update for {}".format(slug)) + ark = ARK(slug.decode('utf-8')) + command = "{}/bastion.py bank asset {}".format(str(BIN_PATH), str(ark)) + #os.system(command) + print(command) + updates.append(str(ark)) + + if updates: + return request.succeeded(updates, report = "performed {} updates ({} expected)".format(len(updates), len(zone.assets))) + else: + return request.succeeded(updates, report = "no updates to perform") + if __name__ == '__main__': app = App().configured() @@ -927,5 +1030,11 @@ def do_export_update_plan(self, request): #bastion export update plan {site} {zone} #bastion export update plan {ark} +#-- These are intermediate operations that will be depecrated when +#-- the self performative system is fully implemented. +#bastion queue asset updates {site} {zone} +#bastion queue asset update {ark} +#bastion perform queued asset updates + #bastion perform request filed #bastion perform request queued diff --git a/lib/Bastion/Common.py b/lib/Bastion/Common.py index 2f0b04d..7da7b2f 100644 --- a/lib/Bastion/Common.py +++ b/lib/Bastion/Common.py @@ -27,7 +27,12 @@ HOUR = datetime.timedelta(hours = 1) HOURS = HOUR - +ALWAYS = True +NEVER = False +YES = True +NO = False +ON = True +OFF = False class Thing: def __init__(self, **kwargs): diff --git a/lib/Bastion/Model.py b/lib/Bastion/Model.py index 275e7f2..3c42392 100644 --- a/lib/Bastion/Model.py +++ b/lib/Bastion/Model.py @@ -45,7 +45,7 @@ def __new__(cls, *args): site, zone, asset = args s = RDN(site) z = RDN(zone) - a = asset.name if isinstance(isAsset) else str(asset) + a = asset.name if isinstance(asset, isAsset) else str(asset) st = s if (s[0] == '@') else "@{}".format(s) return tuple.__new__(cls, [st, z, a])