From add72decd5d9842648644b4c56f6d31cc5dc96af Mon Sep 17 00:00:00 2001 From: jaydee Date: Wed, 26 Nov 2025 08:58:16 +0100 Subject: [PATCH] build --- omv_backup_scp.py | 875 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 875 insertions(+) create mode 100755 omv_backup_scp.py diff --git a/omv_backup_scp.py b/omv_backup_scp.py new file mode 100755 index 0000000..0170046 --- /dev/null +++ b/omv_backup_scp.py @@ -0,0 +1,875 @@ +#!/myapps/venv/bin/python3 +import datetime +import logging +from paho.mqtt import client as mqtt_client +import getopt +import json +import time +import socket +import subprocess +from subprocess import Popen, PIPE, CalledProcessError +import sys +import os +import re +import platform +import requests +import fnmatch +import yaml +import paramiko +import shutil +import signal +import paho.mqtt.client as mqtt +#import numpy as np + +def signal_handler(sig, frame): + print('You pressed Ctrl+C!') + conn.close() + sys.exit(0) + + + +signal.signal(signal.SIGINT, signal_handler) +file_path = os.path.realpath(__file__) +dir_path = os.path.dirname(file_path) +VERSION="1.0.10" +# print(file_path) +# print(dir_path) +os.chdir(dir_path) +from wakeonlan import send_magic_packet +pid = os.getpid() +def is_port_open(host, port): + try: + sock = socket.create_connection((host, port)) + sock.close() + return True + except socket.error: + return False +servers = ["rpi5.home.lan","nas.home.lan","rack.home.lan","m-server.home.lan"] +host = platform.node().lower() +#input(host) +cmnd = "ps -ef|grep omv_backups.py|grep -v grep |grep -v {}|wc -l".format(pid) +status, output = subprocess.getstatusoutput(cmnd) +if int(output) > 0: + print("Running already!") + sys.exit() + +def is_port_open(host, port): + try: + sock = socket.create_connection((host, port)) + sock.close() + return True + except socket.error: + return False +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +# doesn't even have to be reachable +conn = False +while not conn: + try: + s.connect(('192.168.77.1', 1)) + IP = s.getsockname()[0] + conn = True + except: + time.sleep(5) + +broker = 'mqtt.home.lan' +port = 1883 +topic_sum = "sectorq/amd/backups" +mqtt_username = 'jaydee' +mqtt_password = 'jaydee1' +try: + opts, args = getopt.getopt(sys.argv[1:], "hTamftDr:bd:sSOl:", ["command=", "help", "output="]) +except getopt.GetoptError as err: + #usage() + sys.exit(2) +output = None +# QJ : getopts +_MODE = "manual" +_FIRST = _TEST = _RESTORE = _BACKUP = _SYNC = _START = _STOP = _SSH_TEST = False +_EXECUTE = True +_DATE = "pick" +_LOG_LEVEL = "" +for o, a in opts: + if o == "-a": + _MODE = "auto" + elif o in ("-m", "--manual"): + _MODE = "manual" + elif o in ("-l", "--level"): + _LOG_LEVEL = a.upper() + elif o in ("-f", "--first"): + _FIRST = True + elif o in ("-d", "--date"): + _DATE = a + elif o in ("-t", "--test"): + _TEST = True + elif o in ("-s", "--sync"): + _SYNC = True + elif o in ("-S", "--start"): + _START = True + elif o in ("-O", "--stop"): + _STOP = True + elif o in ("-r", "--restore"): + _RESTORE = True + _APP = a + elif o in ("-D", "--dry"): + _EXECUTE = False + elif o in ("-T", "--dry"): + _SSH_TEST = True + elif o in ("-h", "--help"): + print(VERSION) + sys.exit() +LOG_FILE = "omv_backup.log" +if _LOG_LEVEL == "DEBUG": + logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') + logging.debug('using debug loging') +elif _LOG_LEVEL == "ERROR": + logging.basicConfig(filename=LOG_FILE, level=logging.ERROR, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') + logging.info('using error loging') +elif _LOG_LEVEL == "SCAN": + logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') + logging.info('using error loging') +else: + logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') +logging.info("script started") + +logger = logging.getLogger(__name__) +client_id = "dasdasdasd333" +# try: +# client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id) +# except: +# client = mqtt_client.Client() +# client.username_pw_set(mqtt_username, mqtt_password) + + + + + +logging.info(f"Start OMV Backup") +BROKER = 'mqtt.home.lan' # e.g., 'mqtt.example.com' +PORT = 1883 # Typically 8883 for secure MQTT +TOPIC = 'sectorq/backups/start' +USERNAME = 'jaydee' +PASSWORD = 'jaydee1' +USE_TLS = False # Set to False if not using TLS +backups = { + "nas": { + "login": "admin@nas.home.lan", + "jobs": { + "github": + {"source":"/share/Data/__GITHUB", + "exclude":"", + "active": True + }, + "photo": { + "source":"/share/Photo/Years", + "exclude":"", + "active":True + } + } + }, + "m-server":{ + "login": "root@m-server.home.lan", + "jobs": { + "docker_data":{ + "source":"/share/docker_data/", + "exclude":"", + "active":True + }, + "fail2ban":{ + "source":"/etc/fail2ban/", + "exclude":"", + "active":True + } + } + }, + "rpi5.home.lan":{ + "docker_data":{ + "source":"/share/docker_data/", + "exclude":"", + "active":True + }, + "fail2ban":{ + "source":"/etc/fail2ban/", + "exclude":"", + "active":True + } + } +} + +BACKUP_FS = "/media/backup/" +BACKUP_HOST = "amd.home.lan" +#BACKUP_HOST = "morefine.home.lan" + +logging.info("Test connection") +while True: + try: + hm = socket.gethostbyaddr(BACKUP_HOST) + break + except socket.gaierror: + logging.info("Waiting for backup host...") + time.sleep(5) + + +logging.info(_RESTORE) +def send_mqtt_message(topic,msg,qos=0,retain=False): + client2 = mqtt.Client() + client2.username_pw_set("jaydee", "jaydee1") + try: + client2.connect("mqtt.home.lan",1883,60) + client2.publish(topic, json.dumps(msg), qos=qos, retain=retain) + client2.disconnect() + logging.info(f"Message1 sent {topic}, {msg}") + except ValueError as e: + logging.error("Failed to send") + print("Failed to send") + print(e) + +if _SYNC: + containers = ["HomeAssistant","webhub-web-1","heimdall","pihole","mosquitto-mosquitto-1","mailu3-redis-1","mailu3-webmail-1","mailu3-resolver-1","mailu3-antispam-1","mailu3-webdav-1","mailu3-smtp-1","mailu3-oletools-1","mailu3-front-1","mailu3-fetchmail-1","mailu3-imap-1","matter-server","piper-en","openwakeword","whisper-en","auth-worker-1","auth-server-1","auth-authentik_ldap-1","auth-redis-1","auth-postgresql-1","nginx-app-1"] + + cmnd = f"curl -H 'Authorization: Bearer l4c1j4yd33Du5lo' 192.168.77.238:8094/v1/update" + logging.info(cmnd) + status, output = subprocess.getstatusoutput(cmnd) + + if _START: + for c in containers: + cmnd = f"docker start {c}" + print(cmnd) + status, output = subprocess.getstatusoutput(cmnd) +if _STOP: + cmnd = "docker ps" + status, running_containers = subprocess.getstatusoutput(cmnd) + logging.info(running_containers) + for c in running_containers.splitlines(): + print(c.split()[-1]) + if c.split()[-1] == "watchtower-watchtower-1": + continue + cmnd = f"docker stop {c.split()[-1]}" + status, running_containers = subprocess.getstatusoutput(cmnd) +def restore_job(_APP): + #global VERSION + logging.info("Starting Restore") + print(f"Starting restore : {VERSION}") + now = datetime.datetime.now() + STARTTIME = now.strftime("%Y-%m-%d_%H:%M:%S") + _DATE = "pick" + if _APP == "all": + _DATE = "latest" + if host == "rpi5.home.lan" or host == "rpi5": + _APP = ["__backups", "nginx","ha","gitea","gitlab","mailu","bitwarden","esphome","grafana","ingluxdb","kestra","matter-server","mosquitto","octoprint","octoprint2","pihole","unify_block","webhub","homepage","watchtower"] + else: + cmnd = "ssh root@amd.home.lan 'ls /mnt/raid/backup/m-server/docker_data/latest'" + status, output = subprocess.getstatusoutput(cmnd) + _APP = output.splitlines() + logging.info(_APP) + #input("????") + else: + _APP = _APP.split(",") + + PROGRESS = 0 + topic = "sectorq/amd/restore" + step = 100 / len(_APP) + for app in _APP: + #msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":str(round(np.ceil(PROGRESS))) + "%","finished":1,"used_space":1} + msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":str(round(PROGRESS,0)) + "%","finished":1,"used_space":1} + logging.info(msg) + + + send_mqtt_message(topic,msg) + PROGRESS = PROGRESS + step + now = datetime.datetime.now() + DATETIME = now.strftime("%Y-%m-%d_%H-%M-%S") + BACKUP_HOST = f"root@amd.home.lan" + BACKUP_DEVICE = "/mnt/raid" + BACKUP_DIR = f"/backup/{host}" + + + if _DATE == "pick": + cmnd = f"ssh root@amd.home.lan 'ls {BACKUP_DEVICE}/backup/m-server/docker_data'" + status, output = subprocess.getstatusoutput(cmnd) + # print(output) + dates = output.splitlines() + n = 1 + for i in dates: + print(f"{n} - {i}" ) + n += 1 + + ans = input("Pick a backup to restore : ") + _DATE = dates[int(ans) - 1] + + + if app == "fail2ban": + logging.info("?>?????") + NEW_BACKUP_DIR = f"/backup/m-server/fail2ban/{_DATE}/" + SOURCE_DIR = f"/etc/fail2ban" + else: + NEW_BACKUP_DIR = f"/backup/m-server/docker_data/{_DATE}/{app}" + SOURCE_DIR = f"/share/docker_data/" + if _FIRST: + BACKUP_PATH="{}/initial".format(BACKUP_DIR) + else: + BACKUP_PATH="{}/{}".format(BACKUP_DIR, DATETIME) + LATEST_LINK="{}/{}".format(BACKUP_DIR,_DATE) + FULL_BACKUP_LATEST = f"{NEW_BACKUP_DIR}/{_DATE}" + LATEST_LINK = f"/{host}/{app}/{_DATE}" + + logging.info("Create backup dir") + #logging.info(cmnd) + + + #cmnd = "rsync -av --delete {}/ --link-dest {} --exclude=\".cache\" {}".format(SOURCE_DIR, LATEST_LINK, BACKUP_PATH) + + + if app == "heimdall": + logging.info("Stopping docker") + cmnd = "docker stop heimdall" + status, output = subprocess.getstatusoutput(cmnd) + cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" + ans = "y" + logging.info(cmnd) + logging.info("Sync files") + if _TEST: + ans = input("continue?") or "n" + if ans == "y" and _EXECUTE: + status, output = subprocess.getstatusoutput(cmnd) + + + entries = ["Home Assistant","Nginx Proxy Manager","Portainer","Roundcube","Authentik","Kestra"] + for e in entries: + cmnd = f"sqlite3 /share/docker_data/heimdall/config/www/app.sqlite \"SELECT url FROM items WHERE title = '{e}'\"" + logging.info(cmnd) + status, output = subprocess.getstatusoutput(cmnd) + regex = re.compile(r'[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}') + contents = re.sub(regex, IP , output) + cmnd = f"sqlite3 /share/docker_data/heimdall/config/www/app.sqlite \"UPDATE items SET url = '{contents}' WHERE title = '{e}'\"" + logging.info(cmnd) + status, output = subprocess.getstatusoutput(cmnd) + # cmnd = "docker start heimdall" + # status, output = subprocess.getstatusoutput(cmnd) + + + + + if app == "ha": + logging.info("Stopping docker") + cmnd = "docker stop heimdall" + status, output = subprocess.getstatusoutput(cmnd) + cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" + ans = "y" + logging.info(cmnd) + logging.info("Sync files") + if _TEST: + ans = input("continue?") or "n" + if ans == "y" and _EXECUTE: + status, output = subprocess.getstatusoutput(cmnd) + logging.info("Start docker") + # cmnd = "docker start heimdall" + # status, output = subprocess.getstatusoutput(cmnd) + elif app == "fail2ban": + logging.info("Stopping docker") + cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" + ans = "y" + logging.info(cmnd) + logging.info("Sync files") + if _TEST: + ans = input("continue?") or "n" + if ans == "y" and _EXECUTE: + status, output = subprocess.getstatusoutput(cmnd) + logging.info("Start docker") + # cmnd = "docker start heimdall" + # status, output = subprocess.getstatusoutput(cmnd) + elif app == "homepage": + logging.info("Stopping docker") + cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" + ans = "y" + logging.info(cmnd) + + if _TEST: + ans = input("continue?") or "n" + if ans == "y" and _EXECUTE: + status, output = subprocess.getstatusoutput(cmnd) + + file = "/share/docker_data/homepage/config/widgets.yaml" + with open(file, 'r') as stream: + try: + loaded = yaml.load(stream, Loader=yaml.FullLoader) + except yaml.YAMLError as exc: + logging.info(exc) + + # Modify the fields from the dict + #loaded['logo']['icon'] = "/images/morefine2.png" + logging.info(json.dumps(loaded, indent=2)) + i = 0 + + + for y in loaded: + logging.info(i) + logging.info(y) + + if "logo" in y: + if host == "rpi5.home.lan" or host == "rpi5": + loaded[i]['logo']['icon'] = "/images/rpi5.png" + elif host == "nas.home.lan": + loaded[i]['logo']['icon'] = "/images/qnap_nas.png" + elif host == "rack.home.lan": + loaded[i]['logo']['icon'] = "/images/rack.png" + else: + loaded[i]['logo']['icon'] = "/images/morefine2.png" + i+=1 + + # Save it again + logging.info(f"writing to file {file}") + with open(file, 'w') as stream: + try: + yaml.dump(loaded, stream, default_flow_style=False) + except yaml.YAMLError as exc: + print("failed") + print(exc) + + + + logging.info("Start docker") + # cmnd = "docker start heimdall" + # status, output = subprocess.getstatusoutput(cmnd) + elif app == "nginx1": + logging.info("Stopping docker") + cmnd = "docker stop nginx-app-1" + status, output = subprocess.getstatusoutput(cmnd) + cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" + ans = "y" + logging.info(cmnd) + logging.info("Sync files") + if _TEST: + ans = input("continue?") or "n" + if ans == "y" and _EXECUTE: + status, output = subprocess.getstatusoutput(cmnd) + domains = ["sectorq.eu","gitlab.sectorq.eu","ha.sectorq.eu","mail.sectorq.eu","pw.sectorq.eu","semaphore.sectorq.eu","kestra.sectorq.eu","auth.sectorq.eu"] + for d in domains: + cmnd = f'sqlite3 /share/docker_data/nginx/data/database.sqlite "UPDATE proxy_host SET forward_host = \'{IP}\' WHERE domain_names = \'[\\"{d}\\"]\'"' + logging.info(cmnd) + status, output = subprocess.getstatusoutput(cmnd) + + cmnd = 'egrep -l "# bazarr.sectorq.eu|# gitea.sectorq.eu|# jf.sectorq.eu|# kestra.sectorq.eu|# auth.sectorq.eu|# ha.sectorq.eu|# pw.sectorq.eu|# semaphore.sectorq.eu|# sectorq.eu|# gitlab.sectorq.eu|# ha.sectorq.eu" /share/docker_data/nginx/data/nginx/proxy_host/*' + status, output = subprocess.getstatusoutput(cmnd) + logging.info(output.splitlines()) + for file in output.splitlines(): + logging.info(file) + f = open(file) + contents = f.read() + f.close() + regex = re.compile(r'\n\s+set \$server\s+\"\w+.\w+.\w+.\w+\";') + contents = re.sub(regex, f'\n set $server \"{IP}\";', contents) + #print(contents) + logging.info(regex) + f = open(file, "w") + contents = f.write(contents) + f.close() + status, output = subprocess.getstatusoutput(cmnd) + logging.info("Starting docker") + # cmnd = "docker start nginx-app-1" + # status, output = subprocess.getstatusoutput(cmnd) + else: + cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" + ans = "y" + logging.info(cmnd) + logging.info("Sync files") + if _TEST: + ans = input("continue?") or "n" + if ans == "y" and _EXECUTE: + status, output = subprocess.getstatusoutput(cmnd) + now = datetime.datetime.now() + ENDJOB = now.strftime("%Y-%m-%d_%H:%M:%S") + logging.info("Sending finished status") + + msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":100,"finished":ENDJOB,"used_space":1} + logging.info(msg) + send_mqtt_message(topic,msg) + if _MODE == "auto": + cmnd = "ssh root@amd.home.lan 'systemctl suspend &'" + status, output = subprocess.getstatusoutput(cmnd) + +def backup_job(pl): + client2 = mqtt.Client() + client2.username_pw_set("jaydee", "jaydee1") + client2.connect("mqtt.home.lan",1883,60) + if "log" in pl: + if pl["log"] == "debug": + logging.info(f'Debug enabled') + LOG_FILE = "omv_backup.log" + logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') + logging.info(f'starting backup job') + + + + + server = pl["host"] + if pl["mode"] == "dry": + _DRYRUN = True + logging.info("Dry run active") + else: + _DRYRUN = False + logging.info("Full mode active") + + finished = [] + sub_finished = [] + now = datetime.datetime.now() + STARTTIME = now.strftime("%Y-%m-%d_%H:%M:%S") + topic = "sectorq/amd/restore" + msg = {"mode":"restore", "status":"restore","bak_name":"s","host":0,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":0,"used_space":0} + + client2.publish(topic, json.dumps(msg),qos=0, retain=True) + #client2.publish(topic, msg) + topic = "sectorq/amd/backups" + msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":"","cur_job":"","start_time":STARTTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)} + client2.publish(topic, json.dumps(msg),qos=0, retain=True) + + # iterate over files in + # that directory + + + host = server + logging.info("Backup") + for b in backups[host]["jobs"]: + + if not backups[host]["jobs"][b]["active"]: + logging.info("Backup {} is not active!".format(b)) + msg = {"status":"inactive","bak_name":b,"start_time":"inactive","end_time":"inactive","progress":0} + client2.publish(topic, json.dumps(msg),qos=0, retain=True) + continue + + SOURCE_DIR = backups[host]["jobs"][b]["source"] + now = datetime.datetime.now() + BACKUP_HOST = backups[host]["login"] + BACKUP_DEVICE = "/mnt/raid" + BACKUP_DIR = f"{BACKUP_HOST}:{SOURCE_DIR}" + BACKUP_ROOT = f"{BACKUP_DEVICE}/backup/{host}/{b}" + DATETIME = now.strftime("%Y-%m-%d_%H-%M-%S") + + if _FIRST: + NEW_BACKUP_DIR = f"{BACKUP_ROOT}/initial" + else: + NEW_BACKUP_DIR = f"{BACKUP_ROOT}/{DATETIME}_running" + + FULL_BACKUP_LATEST = f"{BACKUP_ROOT}/latest" + + # msg = {"status":"started","bak_name":b,"start_time":DATETIME,"end_time":"in progress", "progress":0} + msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":host,"cur_job":b,"start_time":STARTTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)} + + client2.publish(topic, json.dumps(msg),qos=0, retain=True) + + + + cmnd = "mkdir -p " + NEW_BACKUP_DIR + logging.info(cmnd) + + if _EXECUTE: + status, output = subprocess.getstatusoutput(cmnd) + logging.info(output) + logging.info(status) + logging.info("Create backup dir") + + cmnd = f"ssh {BACKUP_HOST} 'ls {SOURCE_DIR}'" + logger.debug(cmnd) + status, output = subprocess.getstatusoutput(cmnd) + logger.debug(output) + apps = output.splitlines() + c = len(apps) + print(apps) + print(len(apps)) + + step = round(100 / c,1) + progress = 0 + cmd = ['rsync', '-avz', '--delete', BACKUP_DIR, '--link-dest', FULL_BACKUP_LATEST, '--exclude-from=/myapps/exclude.txt', NEW_BACKUP_DIR] + logging.info(" ".join(cmd)) + topic = "sectorq/amd/backups" + + if not _DRYRUN: + process = subprocess.Popen(cmd, + stdout=subprocess.PIPE) + while process.poll() is None: + line = process.stdout.readline().decode("utf-8").split("/") + #print(line[0]) + if line[0] in apps: + logging.info(f"Working on app {line[0]}") + while True: + if line[0] != apps[0]: + del apps[0] + progress = progress + step + else: + break + apps.remove(line[0]) + sub_finished.append(line[0]) + msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":host,"cur_job":b,"sub":line[0],"start_time":STARTTIME,"end_time":"in progress","progress":str(round(progress)) + "%","finished":",".join(finished),"sub_finished":",".join(sub_finished)} + logging.info(f"Sending message with topic {topic} {json.dumps(msg)}") + if not "gitea-runner" == line[0]: + client2.publish(topic, json.dumps(msg),qos=0, retain=False) + progress = progress + step + + cmnd = f"rm -rf {FULL_BACKUP_LATEST}" + + #logging.info(cmnd) + logging.info("Removing latest link") + # input("????") + if not _DRYRUN: + status, output = subprocess.getstatusoutput(cmnd) + if _FIRST: + cmnd = f"cd {BACKUP_ROOT}; ln -s initial latest" + else: + cmnd = f"cd {BACKUP_ROOT}; mv {DATETIME}_running {DATETIME};ln -s {DATETIME} latest" + logging.info("Creating new latest link") + #print(cmnd) + # input("????") + if not _DRYRUN: + status, output = subprocess.getstatusoutput(cmnd) + + #Remove old + logging.info("Removing old dirs") + + cmnd = f"ls {BACKUP_ROOT}" + + if not _DRYRUN: + status, output = subprocess.getstatusoutput(cmnd) + for f in output.splitlines(): + pattern = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}-[0-9]{2}-[0-9]{2}$" # regex pattern: string starts with 'abc' + # logging.info(f"Checking {f}") + if re.match(pattern, f): + # logging.info("Match!") + dt = datetime.datetime.strptime(f, "%Y-%m-%d_%H-%M-%S") + epoch_time = int(dt.timestamp()) + + now_epoch = int(datetime.datetime.now().timestamp()) + x = now_epoch - epoch_time + # logging.info(epoch_time) # Output: 45 + if x > 2592000: + dir_path = f"{BACKUP_ROOT}/{f}" + logging.info(f"removing {dir_path}") + shutil.rmtree(dir_path) + else: + print("No match.") + if not _DRYRUN: + logging.info(f"Clearing multiple days") + multiple_per_day = {} + to_remove = [] + for f in output.splitlines(): + pattern = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}-[0-9]{2}-[0-9]{2}$" # regex pattern: string starts with 'abc' + + if re.match(pattern, f): + cday = f.split("_")[0] + if cday in multiple_per_day: + multiple_per_day[cday].append(f) + else: + multiple_per_day[cday] = [f] + + + + + # # logging.info("Match!") + # dt = datetime.datetime.strptime(f, "%Y-%m-%d_%H-%M-%S") + # epoch_time = int(dt.timestamp()) + # now_epoch = int(datetime.datetime.now().timestamp()) + + + # x = now_epoch - epoch_time + # # logging.info(epoch_time) # Output: 45 + # if x > 2592000: + # dir_path = f"{BACKUP_ROOT}/{f}" + # logging.info(f"removing {dir_path}") + # shutil.rmtree(dir_path) + else: + print("No match.") + logging.info(f"Clearing multiple days: {multiple_per_day}") + for f in multiple_per_day: + logging.info(f"Looping multiple_per_day : {f}") + if len(multiple_per_day[f]) > 1: + last = multiple_per_day[f][-1] + multiple_per_day[f].pop() + logging.info(f"Last from day: {last}") + for d in multiple_per_day[f]: + logging.info(f"Looping multiple_per_day : {f} : {d}") + dir_path = f"{BACKUP_ROOT}/{d}" + logging.info(f"removing {dir_path}") + shutil.rmtree(dir_path) + + + cmnd = f"ls {BACKUP_ROOT}|grep _running" + logging.info(f"removing obsolete dirs") + if not _DRYRUN: + status, output = subprocess.getstatusoutput(cmnd) + for f in output.splitlines(): + dir_path = f"{BACKUP_ROOT}/{f}" + logging.info(f"removing {dir_path}") + shutil.rmtree(dir_path) + + now = datetime.datetime.now() + ENDTIME = now.strftime("%Y-%m-%d_%H:%M:%S") + #msg = {"status":"finished","bak_name":b,"start_time":DATETIME,"end_time":ENDTIME,"progress":0} + finished.append(b) + msg = {"mode":_MODE, "status":"finished","bak_name":"complete","host":host,"cur_job":b,"start_time":ENDTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)} + client2.publish(topic, json.dumps(msg),qos=0, retain=True) + + logging.info("Getting size of FS") + cmnd = "df -h /mnt/raid|awk '{ print $3 }'|tail -1" + logging.info(cmnd) + status, output = subprocess.getstatusoutput(cmnd) + used_space = (output.split())[0] + now = datetime.datetime.now() + ENDJOB = now.strftime("%Y-%m-%d_%H:%M:%S") + logging.info("Size : {}".format(used_space)) + logging.info("Sending finished status") + #msg = {"mode":_MODE,"status":"finished","bak_name":"complete","start_time":STARTTIME,"end_time":ENDJOB,"progress":0,"used_space":used_space} + msg = {"mode":_MODE, "status":"finished","bak_name":"complete","host":host,"cur_job":b,"start_time":STARTTIME,"end_time":ENDTIME,"progress":0,"finished":",".join(finished),"used_space":used_space} + logging.info(msg) + + client2.publish(topic, json.dumps(msg),qos=0, retain=True) + topic = "sectorq/backups/start" + logging.info(f"LALA : {topic}") + client2.publish(topic, "finished",qos=0, retain=True) + time.sleep(1) + client2.publish(topic, "finished2",qos=0, retain=True) + client2.disconnect() + #return "finished" + + if _DRYRUN: + return + topic = "sectorq/amd/restore" + for s in servers: + logging.info(f"Restoring {s}") + #if s != "rack.home.lan": + if s == "m-server.home.lan": + continue + elif s == "nas.home.lan": + user = "admin" + cmnd = "/share/Data/python/bin/python3 /share/Data/__GITLAB/omv_backup/omv_backup.py -r all" + else: + user = "jd" + cmnd = "sudo /myapps/omv_backup.py -r all" + msg = {"mode":_MODE, "status":"restore","bak_name":"s","host":s,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":1,"used_space":1} + #logging.info(msg) + + send_mqtt_message(topic,msg) + #continue + if is_port_open(s,22): + ssh = paramiko.SSHClient() + ssh.load_system_host_keys() + # Add SSH host key automatically if needed. + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + # Connect to router using username/password authentication. + logger.info(f"Sync {s}") + print(f"Sync {s}") + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + pkey = paramiko.RSAKey.from_private_key_file("/home/jd/.ssh/id_rsa") + ssh.connect(s, + username=user, + look_for_keys=False, + allow_agent=False, + pkey=pkey) + print(cmnd) + ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(cmnd) + for line in iter(ssh_stdout.readline, ""): + logger.info(line) + print(line, end="") + for line in iter(ssh_stderr.readline, ""): + logger.info(line) + ssh.close() + try: + os.remove("/backups/restore") + except: + pass + + return "finished1" + + +if _RESTORE: + restore_job(_APP) + sys.exit() +if _SSH_TEST: + user = "root" + cmnd = "ls -la" + topic = "sectorq/amd/backups" + for s in servers: + # if s == "m-server.home.lan": + # continue + # elif s == "nas.home.lan": + # user = "admin" + # cmnd = "/share/Data/__GITLAB/omv_backup/venv/bin/python3 /share/Data/__GITLAB/omv_backup/omv_backup.py -r all" + msg = {"mode":_MODE, "status":"restore","bak_name":"s","host":s,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":1,"used_space":1} + #logging.info(msg) + + send_mqtt_message(topic,msg) + if s != "rack.home.lan": + continue + if is_port_open(s,22): + ssh = paramiko.SSHClient() + ssh.load_system_host_keys() + # Add SSH host key automatically if needed. + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + # Connect to router using username/password authentication. + logger.info(f"Sync {s}") + print(f"Sync {s}") + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + pkey = paramiko.RSAKey.from_private_key_file("/home/jd/.ssh/id_rsa") + ssh.connect(s, + username=user, + look_for_keys=False, + allow_agent=False, + pkey=pkey) + print(cmnd) + ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(cmnd) + for line in iter(ssh_stdout.readline, ""): + logger.info(line) + print(line, end="") + for line in iter(ssh_stderr.readline, ""): + logger.info(line) + ssh.close() + +# Define actions based on payload +def handle_payload(payload): + try: + pl = json.loads(payload) + except: + pl = payload + logging.debug(pl) + + if "host" in pl: + if pl["host"] == 'm-server': + logging.info("💡 Starting backup job") + backup_job(pl) + logging.info(f"💡 Finished backup job") + elif pl["host"] == 'nas': + logging.info("💡 Starting backup job") + backup_job(pl) + logging.info(f"💡 Finished backup job") + else: + logging.error(f"⚠️ Unknown command: {pl}") + else: + logging.error(f"⚠️ Wrong payload: {pl}") +# Callback when connected +def on_connect(client, userdata, flags, rc): + if rc == 0: + logging.info("✅ Connected securely to broker") + client.subscribe(TOPIC) + logging.info(f"📡 Subscribed to topic: {TOPIC}") + else: + logging.error(f"❌ Connection failed with code {rc}") + +# Callback when a message is received +def on_message(client, userdata, msg): + payload = msg.payload.decode() + logging.info(f"📨 Received message: {payload} on topic: {msg.topic}") + handle_payload(payload) + +# MQTT client setup +client = mqtt.Client() +client.username_pw_set(USERNAME, PASSWORD) +client.on_connect = on_connect +client.on_message = on_message + +# Use TLS for encrypted connection +if USE_TLS: + client.tls_set() # You can customize this with certs if needed + +# Connect and loop forever +client.connect(BROKER, PORT, keepalive=60) + + +client.publish("sectorq/backups/start", "finished", qos=0, retain=True) +client.loop_forever() \ No newline at end of file