#!/myapps/venv/bin/python3 import datetime import logging from paho.mqtt import client as mqtt_client import getopt import json import time import socket import subprocess from subprocess import Popen, PIPE, CalledProcessError import sys import os import re import platform import requests import fnmatch import yaml import paramiko import shutil import signal import paho.mqtt.client as mqtt #import numpy as np def signal_handler(sig, frame): print('You pressed Ctrl+C!') conn.close() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) file_path = os.path.realpath(__file__) dir_path = os.path.dirname(file_path) VERSION="1.0.9" # print(file_path) # print(dir_path) os.chdir(dir_path) from wakeonlan import send_magic_packet pid = os.getpid() def is_port_open(host, port): try: sock = socket.create_connection((host, port)) sock.close() return True except socket.error: return False servers = ["rpi5.home.lan","nas.home.lan","rack.home.lan","m-server.home.lan"] host = platform.node().lower() #input(host) cmnd = "ps -ef|grep omv_backups.py|grep -v grep |grep -v {}|wc -l".format(pid) status, output = subprocess.getstatusoutput(cmnd) if int(output) > 0: print("Running already!") sys.exit() def is_port_open(host, port): try: sock = socket.create_connection((host, port)) sock.close() return True except socket.error: return False s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # doesn't even have to be reachable conn = False while not conn: try: s.connect(('192.168.77.1', 1)) IP = s.getsockname()[0] conn = True except: time.sleep(5) broker = 'mqtt.home.lan' port = 1883 topic_sum = "sectorq/amd/backups" mqtt_username = 'jaydee' mqtt_password = 'jaydee1' try: opts, args = getopt.getopt(sys.argv[1:], "hTamftDr:bd:sSOl:", ["command=", "help", "output="]) except getopt.GetoptError as err: #usage() sys.exit(2) output = None # QJ : getopts _MODE = "manual" _FIRST = _TEST = _RESTORE = _BACKUP = _SYNC = _START = _STOP = _SSH_TEST = False _EXECUTE = True _DATE = "pick" _LOG_LEVEL = "" for o, a in opts: if o == "-a": _MODE = "auto" elif o in ("-m", "--manual"): _MODE = "manual" elif o in ("-l", "--level"): _LOG_LEVEL = a.upper() elif o in ("-f", "--first"): _FIRST = True elif o in ("-d", "--date"): _DATE = a elif o in ("-t", "--test"): _TEST = True elif o in ("-s", "--sync"): _SYNC = True elif o in ("-S", "--start"): _START = True elif o in ("-O", "--stop"): _STOP = True elif o in ("-r", "--restore"): _RESTORE = True _APP = a print("RESTORE") elif o in ("-D", "--dry"): _EXECUTE = False elif o in ("-T", "--dry"): _SSH_TEST = True elif o in ("-h", "--help"): print(VERSION) sys.exit() LOG_FILE = "omv_backup.log" if _LOG_LEVEL == "DEBUG": logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') logging.debug('using debug loging') elif _LOG_LEVEL == "ERROR": logging.basicConfig(filename=LOG_FILE, level=logging.ERROR, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') logging.info('using error loging') elif _LOG_LEVEL == "SCAN": logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') logging.info('using error loging') else: logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') logging.info("script started") logger = logging.getLogger(__name__) client_id = "dasdasdasd333" # try: # client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id) # except: # client = mqtt_client.Client() # client.username_pw_set(mqtt_username, mqtt_password) logging.info(f"Start OMV Backup") BROKER = 'mqtt.home.lan' # e.g., 'mqtt.example.com' PORT = 1883 # Typically 8883 for secure MQTT TOPIC = 'sectorq/backups/start' USERNAME = 'jaydee' PASSWORD = 'jaydee1' USE_TLS = False # Set to False if not using TLS backups = { "nas": { "login": "admin@nas.home.lan", "jobs": { "github": {"source":"/share/Data/__GITHUB", "exclude":"", "active": True }, "photo": { "source":"/share/Photo/Years", "exclude":"", "active":True } } }, "m-server":{ "login": "root@m-server.home.lan", "jobs": { "docker_data":{ "source":"/share/docker_data/", "exclude":"", "active":True }, "fail2ban":{ "source":"/etc/fail2ban/", "exclude":"", "active":True } } }, "rpi5.home.lan":{ "docker_data":{ "source":"/share/docker_data/", "exclude":"", "active":True }, "fail2ban":{ "source":"/etc/fail2ban/", "exclude":"", "active":True } } } BACKUP_FS = "/media/backup/" BACKUP_HOST = "amd.home.lan" #BACKUP_HOST = "morefine.home.lan" logging.info("Test connection") hm = socket.gethostbyaddr(BACKUP_HOST) logging.info(_RESTORE) def send_mqtt_message(topic,msg,qos=0,retain=False): client2 = mqtt.Client() client2.username_pw_set("jaydee", "jaydee1") try: client2.connect("mqtt.home.lan",1883,60) client2.publish(topic, json.dumps(msg), qos=qos, retain=retain) client2.disconnect() logging.info(f"Message1 sent {topic}, {msg}") except ValueError as e: logging.error("Failed to send") print("Failed to send") print(e) if _SYNC: containers = ["HomeAssistant","webhub-web-1","heimdall","pihole","mosquitto-mosquitto-1","mailu3-redis-1","mailu3-webmail-1","mailu3-resolver-1","mailu3-antispam-1","mailu3-webdav-1","mailu3-smtp-1","mailu3-oletools-1","mailu3-front-1","mailu3-fetchmail-1","mailu3-imap-1","matter-server","piper-en","openwakeword","whisper-en","auth-worker-1","auth-server-1","auth-authentik_ldap-1","auth-redis-1","auth-postgresql-1","nginx-app-1"] cmnd = f"curl -H 'Authorization: Bearer l4c1j4yd33Du5lo' 192.168.77.238:8094/v1/update" logging.info(cmnd) status, output = subprocess.getstatusoutput(cmnd) if _START: for c in containers: cmnd = f"docker start {c}" print(cmnd) status, output = subprocess.getstatusoutput(cmnd) if _STOP: cmnd = "docker ps" status, running_containers = subprocess.getstatusoutput(cmnd) logging.info(running_containers) for c in running_containers.splitlines(): print(c.split()[-1]) if c.split()[-1] == "watchtower-watchtower-1": continue cmnd = f"docker stop {c.split()[-1]}" status, running_containers = subprocess.getstatusoutput(cmnd) def restore_job(_APP): logging.info("Starting Restore") now = datetime.datetime.now() STARTTIME = now.strftime("%Y-%m-%d_%H:%M:%S") _DATE = "pick" if _APP == "all": _DATE = "latest" if host == "rpi5.home.lan" or host == "rpi5": _APP = ["__backups", "nginx","ha","gitea","gitlab","mailu","bitwarden","esphome","grafana","ingluxdb","kestra","matter-server","mosquitto","octoprint","octoprint2","pihole","unify_block","webhub","homepage","watchtower"] else: cmnd = "ssh root@amd.home.lan 'ls /mnt/raid/backup/m-server/docker_data/latest'" status, output = subprocess.getstatusoutput(cmnd) _APP = output.splitlines() logging.info(_APP) #input("????") else: _APP = _APP.split(",") PROGRESS = 0 topic = "sectorq/amd/restore" step = 100 / len(_APP) for app in _APP: #msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":str(round(np.ceil(PROGRESS))) + "%","finished":1,"used_space":1} msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":str(round(PROGRESS,0)) + "%","finished":1,"used_space":1} logging.info(msg) send_mqtt_message(topic,msg) PROGRESS = PROGRESS + step now = datetime.datetime.now() DATETIME = now.strftime("%Y-%m-%d_%H-%M-%S") BACKUP_HOST = f"root@amd.home.lan" BACKUP_DEVICE = "/mnt/raid" BACKUP_DIR = f"/backup/{host}" if _DATE == "pick": cmnd = f"ssh root@amd.home.lan 'ls {BACKUP_DEVICE}/backup/m-server/docker_data'" status, output = subprocess.getstatusoutput(cmnd) print(output) dates = output.splitlines() n = 1 for i in dates: print(f"{n} - {i}" ) n += 1 ans = input("Pick a backup to restore : ") _DATE = dates[int(ans) - 1] if app == "fail2ban": logging.info("?>?????") NEW_BACKUP_DIR = f"/backup/m-server/fail2ban/{_DATE}/" SOURCE_DIR = f"/etc/fail2ban" else: NEW_BACKUP_DIR = f"/backup/m-server/docker_data/{_DATE}/{app}" SOURCE_DIR = f"/share/docker_data/" if _FIRST: BACKUP_PATH="{}/initial".format(BACKUP_DIR) else: BACKUP_PATH="{}/{}".format(BACKUP_DIR, DATETIME) LATEST_LINK="{}/{}".format(BACKUP_DIR,_DATE) FULL_BACKUP_LATEST = f"{NEW_BACKUP_DIR}/{_DATE}" LATEST_LINK = f"/{host}/{app}/{_DATE}" logging.info("Create backup dir") #logging.info(cmnd) #cmnd = "rsync -av --delete {}/ --link-dest {} --exclude=\".cache\" {}".format(SOURCE_DIR, LATEST_LINK, BACKUP_PATH) if app == "heimdall": logging.info("Stopping docker") cmnd = "docker stop heimdall" status, output = subprocess.getstatusoutput(cmnd) cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" ans = "y" logging.info(cmnd) logging.info("Sync files") if _TEST: ans = input("continue?") or "n" if ans == "y" and _EXECUTE: status, output = subprocess.getstatusoutput(cmnd) entries = ["Home Assistant","Nginx Proxy Manager","Portainer","Roundcube","Authentik","Kestra"] for e in entries: cmnd = f"sqlite3 /share/docker_data/heimdall/config/www/app.sqlite \"SELECT url FROM items WHERE title = '{e}'\"" logging.info(cmnd) status, output = subprocess.getstatusoutput(cmnd) regex = re.compile(r'[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}') contents = re.sub(regex, IP , output) cmnd = f"sqlite3 /share/docker_data/heimdall/config/www/app.sqlite \"UPDATE items SET url = '{contents}' WHERE title = '{e}'\"" logging.info(cmnd) status, output = subprocess.getstatusoutput(cmnd) # cmnd = "docker start heimdall" # status, output = subprocess.getstatusoutput(cmnd) if app == "ha": logging.info("Stopping docker") cmnd = "docker stop heimdall" status, output = subprocess.getstatusoutput(cmnd) cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" ans = "y" logging.info(cmnd) logging.info("Sync files") if _TEST: ans = input("continue?") or "n" if ans == "y" and _EXECUTE: status, output = subprocess.getstatusoutput(cmnd) logging.info("Start docker") # cmnd = "docker start heimdall" # status, output = subprocess.getstatusoutput(cmnd) elif app == "fail2ban": logging.info("Stopping docker") cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" ans = "y" logging.info(cmnd) logging.info("Sync files") if _TEST: ans = input("continue?") or "n" if ans == "y" and _EXECUTE: status, output = subprocess.getstatusoutput(cmnd) logging.info("Start docker") # cmnd = "docker start heimdall" # status, output = subprocess.getstatusoutput(cmnd) elif app == "homepage": logging.info("Stopping docker") cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" ans = "y" logging.info(cmnd) if _TEST: ans = input("continue?") or "n" if ans == "y" and _EXECUTE: status, output = subprocess.getstatusoutput(cmnd) file = "/share/docker_data/homepage/config/widgets.yaml" with open(file, 'r') as stream: try: loaded = yaml.load(stream, Loader=yaml.FullLoader) except yaml.YAMLError as exc: logging.info(exc) # Modify the fields from the dict #loaded['logo']['icon'] = "/images/morefine2.png" logging.info(json.dumps(loaded, indent=2)) i = 0 for y in loaded: logging.info(i) logging.info(y) if "logo" in y: if host == "rpi5.home.lan" or host == "rpi5": loaded[i]['logo']['icon'] = "/images/rpi5.png" elif host == "nas.home.lan": loaded[i]['logo']['icon'] = "/images/qnap_nas.png" elif host == "rack.home.lan": loaded[i]['logo']['icon'] = "/images/rack.png" else: loaded[i]['logo']['icon'] = "/images/morefine2.png" i+=1 # Save it again logging.info(f"writing to file {file}") with open(file, 'w') as stream: try: yaml.dump(loaded, stream, default_flow_style=False) except yaml.YAMLError as exc: print("failed") print(exc) logging.info("Start docker") # cmnd = "docker start heimdall" # status, output = subprocess.getstatusoutput(cmnd) elif app == "nginx1": logging.info("Stopping docker") cmnd = "docker stop nginx-app-1" status, output = subprocess.getstatusoutput(cmnd) cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" ans = "y" logging.info(cmnd) logging.info("Sync files") if _TEST: ans = input("continue?") or "n" if ans == "y" and _EXECUTE: status, output = subprocess.getstatusoutput(cmnd) domains = ["sectorq.eu","gitlab.sectorq.eu","ha.sectorq.eu","mail.sectorq.eu","pw.sectorq.eu","semaphore.sectorq.eu","kestra.sectorq.eu","auth.sectorq.eu"] for d in domains: cmnd = f'sqlite3 /share/docker_data/nginx/data/database.sqlite "UPDATE proxy_host SET forward_host = \'{IP}\' WHERE domain_names = \'[\\"{d}\\"]\'"' logging.info(cmnd) status, output = subprocess.getstatusoutput(cmnd) cmnd = 'egrep -l "# bazarr.sectorq.eu|# gitea.sectorq.eu|# jf.sectorq.eu|# kestra.sectorq.eu|# auth.sectorq.eu|# ha.sectorq.eu|# pw.sectorq.eu|# semaphore.sectorq.eu|# sectorq.eu|# gitlab.sectorq.eu|# ha.sectorq.eu" /share/docker_data/nginx/data/nginx/proxy_host/*' status, output = subprocess.getstatusoutput(cmnd) logging.info(output.splitlines()) for file in output.splitlines(): logging.info(file) f = open(file) contents = f.read() f.close() regex = re.compile(r'\n\s+set \$server\s+\"\w+.\w+.\w+.\w+\";') contents = re.sub(regex, f'\n set $server \"{IP}\";', contents) #print(contents) logging.info(regex) f = open(file, "w") contents = f.write(contents) f.close() status, output = subprocess.getstatusoutput(cmnd) logging.info("Starting docker") # cmnd = "docker start nginx-app-1" # status, output = subprocess.getstatusoutput(cmnd) else: cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}" ans = "y" logging.info(cmnd) logging.info("Sync files") if _TEST: ans = input("continue?") or "n" if ans == "y" and _EXECUTE: status, output = subprocess.getstatusoutput(cmnd) now = datetime.datetime.now() ENDJOB = now.strftime("%Y-%m-%d_%H:%M:%S") logging.info("Sending finished status") msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":100,"finished":ENDJOB,"used_space":1} logging.info(msg) send_mqtt_message(topic,msg) if _MODE == "auto": cmnd = "ssh root@amd.home.lan 'systemctl suspend &'" status, output = subprocess.getstatusoutput(cmnd) def backup_job(pl): client2 = mqtt.Client() client2.username_pw_set("jaydee", "jaydee1") client2.connect("mqtt.home.lan",1883,60) if "log" in pl: if pl["log"] == "debug": logging.info(f'Debug enabled') LOG_FILE = "omv_backup.log" logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') logging.info(f'starting backup job') server = pl["host"] if pl["mode"] == "dry": _DRYRUN = True logging.info("Dry run active") else: _DRYRUN = False logging.info("Full mode active") finished = [] sub_finished = [] now = datetime.datetime.now() STARTTIME = now.strftime("%Y-%m-%d_%H:%M:%S") topic = "sectorq/amd/restore" msg = {"mode":"restore", "status":"restore","bak_name":"s","host":0,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":0,"used_space":0} client2.publish(topic, json.dumps(msg),qos=0, retain=True) #client2.publish(topic, msg) topic = "sectorq/amd/backups" msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":"","cur_job":"","start_time":STARTTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)} client2.publish(topic, json.dumps(msg),qos=0, retain=True) # iterate over files in # that directory host = server logging.info("Backup") for b in backups[host]["jobs"]: if not backups[host]["jobs"][b]["active"]: logging.info("Backup {} is not active!".format(b)) msg = {"status":"inactive","bak_name":b,"start_time":"inactive","end_time":"inactive","progress":0} client2.publish(topic, json.dumps(msg),qos=0, retain=True) continue SOURCE_DIR = backups[host]["jobs"][b]["source"] now = datetime.datetime.now() BACKUP_HOST = backups[host]["login"] BACKUP_DEVICE = "/mnt/raid" BACKUP_DIR = f"{BACKUP_HOST}:{SOURCE_DIR}" BACKUP_ROOT = f"{BACKUP_DEVICE}/backup/{host}/{b}" DATETIME = now.strftime("%Y-%m-%d_%H-%M-%S") if _FIRST: NEW_BACKUP_DIR = f"{BACKUP_ROOT}/initial" else: NEW_BACKUP_DIR = f"{BACKUP_ROOT}/{DATETIME}_running" FULL_BACKUP_LATEST = f"{BACKUP_ROOT}/latest" # msg = {"status":"started","bak_name":b,"start_time":DATETIME,"end_time":"in progress", "progress":0} msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":host,"cur_job":b,"start_time":STARTTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)} client2.publish(topic, json.dumps(msg),qos=0, retain=True) cmnd = "mkdir -p " + NEW_BACKUP_DIR logging.info(cmnd) if _EXECUTE: status, output = subprocess.getstatusoutput(cmnd) logging.info(output) logging.info(status) logging.info("Create backup dir") cmnd = f"ssh {BACKUP_HOST} 'ls {SOURCE_DIR}'" logger.debug(cmnd) status, output = subprocess.getstatusoutput(cmnd) logger.debug(output) apps = output.splitlines() c = len(apps) print(apps) print(len(apps)) step = round(100 / c,1) progress = 0 cmd = ['rsync', '-avz', '--delete', BACKUP_DIR, '--link-dest', FULL_BACKUP_LATEST, '--exclude-from=/myapps/exclude.txt', NEW_BACKUP_DIR] logging.info(" ".join(cmd)) topic = "sectorq/amd/backups" if not _DRYRUN: process = subprocess.Popen(cmd, stdout=subprocess.PIPE) while process.poll() is None: line = process.stdout.readline().decode("utf-8").split("/") #print(line[0]) if line[0] in apps: logging.info(f"Working on app {line[0]}") while True: if line[0] != apps[0]: del apps[0] progress = progress + step else: break apps.remove(line[0]) sub_finished.append(line[0]) msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":host,"cur_job":b,"sub":line[0],"start_time":STARTTIME,"end_time":"in progress","progress":str(round(progress)) + "%","finished":",".join(finished),"sub_finished":",".join(sub_finished)} logging.info(f"Sending message with topic {topic} {json.dumps(msg)}") if not "gitea-runner" == line[0]: client2.publish(topic, json.dumps(msg),qos=0, retain=False) progress = progress + step cmnd = f"rm -rf {FULL_BACKUP_LATEST}" #logging.info(cmnd) logging.info("Removing latest link") # input("????") if not _DRYRUN: status, output = subprocess.getstatusoutput(cmnd) if _FIRST: cmnd = f"cd {BACKUP_ROOT}; ln -s initial latest" else: cmnd = f"cd {BACKUP_ROOT}; mv {DATETIME}_running {DATETIME};ln -s {DATETIME} latest" logging.info("Creating new latest link") #print(cmnd) # input("????") if not _DRYRUN: status, output = subprocess.getstatusoutput(cmnd) #Remove old logging.info("Removing old dirs") cmnd = f"ls {BACKUP_ROOT}" if not _DRYRUN: status, output = subprocess.getstatusoutput(cmnd) for f in output.splitlines(): pattern = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}-[0-9]{2}-[0-9]{2}$" # regex pattern: string starts with 'abc' # logging.info(f"Checking {f}") if re.match(pattern, f): # logging.info("Match!") dt = datetime.datetime.strptime(f, "%Y-%m-%d_%H-%M-%S") epoch_time = int(dt.timestamp()) now_epoch = int(datetime.datetime.now().timestamp()) x = now_epoch - epoch_time # logging.info(epoch_time) # Output: 45 if x > 2592000: dir_path = f"{BACKUP_ROOT}/{f}" logging.info(f"removing {dir_path}") shutil.rmtree(dir_path) else: print("No match.") if not _DRYRUN: logging.info(f"Clearing multiple days") multiple_per_day = {} to_remove = [] for f in output.splitlines(): pattern = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}-[0-9]{2}-[0-9]{2}$" # regex pattern: string starts with 'abc' if re.match(pattern, f): cday = f.split("_")[0] if cday in multiple_per_day: multiple_per_day[cday].append(f) else: multiple_per_day[cday] = [f] # # logging.info("Match!") # dt = datetime.datetime.strptime(f, "%Y-%m-%d_%H-%M-%S") # epoch_time = int(dt.timestamp()) # now_epoch = int(datetime.datetime.now().timestamp()) # x = now_epoch - epoch_time # # logging.info(epoch_time) # Output: 45 # if x > 2592000: # dir_path = f"{BACKUP_ROOT}/{f}" # logging.info(f"removing {dir_path}") # shutil.rmtree(dir_path) else: print("No match.") logging.info(f"Clearing multiple days: {multiple_per_day}") for f in multiple_per_day: logging.info(f"Looping multiple_per_day : {f}") if len(multiple_per_day[f]) > 1: last = multiple_per_day[f][-1] multiple_per_day[f].pop() logging.info(f"Last from day: {last}") for d in multiple_per_day[f]: logging.info(f"Looping multiple_per_day : {f} : {d}") dir_path = f"{BACKUP_ROOT}/{d}" logging.info(f"removing {dir_path}") shutil.rmtree(dir_path) cmnd = f"ls {BACKUP_ROOT}|grep _running" logging.info(f"removing obsolete dirs") if not _DRYRUN: status, output = subprocess.getstatusoutput(cmnd) for f in output.splitlines(): dir_path = f"{BACKUP_ROOT}/{f}" logging.info(f"removing {dir_path}") shutil.rmtree(dir_path) now = datetime.datetime.now() ENDTIME = now.strftime("%Y-%m-%d_%H:%M:%S") #msg = {"status":"finished","bak_name":b,"start_time":DATETIME,"end_time":ENDTIME,"progress":0} finished.append(b) msg = {"mode":_MODE, "status":"finished","bak_name":"complete","host":host,"cur_job":b,"start_time":ENDTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)} client2.publish(topic, json.dumps(msg),qos=0, retain=True) logging.info("Getting size of FS") cmnd = "df -h /mnt/raid|awk '{ print $3 }'|tail -1" logging.info(cmnd) status, output = subprocess.getstatusoutput(cmnd) used_space = (output.split())[0] now = datetime.datetime.now() ENDJOB = now.strftime("%Y-%m-%d_%H:%M:%S") logging.info("Size : {}".format(used_space)) logging.info("Sending finished status") #msg = {"mode":_MODE,"status":"finished","bak_name":"complete","start_time":STARTTIME,"end_time":ENDJOB,"progress":0,"used_space":used_space} msg = {"mode":_MODE, "status":"finished","bak_name":"complete","host":host,"cur_job":b,"start_time":STARTTIME,"end_time":ENDTIME,"progress":0,"finished":",".join(finished),"used_space":used_space} logging.info(msg) client2.publish(topic, json.dumps(msg),qos=0, retain=True) topic = "sectorq/backups/start" logging.info(f"LALA : {topic}") client2.publish(topic, "finished",qos=0, retain=True) time.sleep(1) client2.publish(topic, "finished2",qos=0, retain=True) client2.disconnect() #return "finished" if _DRYRUN: return topic = "sectorq/amd/restore" for s in servers: logging.info(f"Restoring {s}") #if s != "rack.home.lan": if s == "m-server.home.lan": continue elif s == "nas.home.lan": user = "admin" cmnd = "/share/Data/python/bin/python3 /share/Data/__GITLAB/omv_backup/omv_backup.py -r all" else: user = "jd" cmnd = "sudo /myapps/omv_backup.py -r all" msg = {"mode":_MODE, "status":"restore","bak_name":"s","host":s,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":1,"used_space":1} #logging.info(msg) send_mqtt_message(topic,msg) #continue if is_port_open(s,22): ssh = paramiko.SSHClient() ssh.load_system_host_keys() # Add SSH host key automatically if needed. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Connect to router using username/password authentication. logger.info(f"Sync {s}") print(f"Sync {s}") ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) pkey = paramiko.RSAKey.from_private_key_file("/home/jd/.ssh/id_rsa") ssh.connect(s, username=user, look_for_keys=False, allow_agent=False, pkey=pkey) print(cmnd) ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(cmnd) for line in iter(ssh_stdout.readline, ""): logger.info(line) print(line, end="") for line in iter(ssh_stderr.readline, ""): logger.info(line) ssh.close() try: os.remove("/backups/restore") except: pass return "finished1" if _RESTORE: restore_job(_APP) sys.exit() if _SSH_TEST: user = "root" cmnd = "ls -la" topic = "sectorq/amd/backups" for s in servers: # if s == "m-server.home.lan": # continue # elif s == "nas.home.lan": # user = "admin" # cmnd = "/share/Data/__GITLAB/omv_backup/venv/bin/python3 /share/Data/__GITLAB/omv_backup/omv_backup.py -r all" msg = {"mode":_MODE, "status":"restore","bak_name":"s","host":s,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":1,"used_space":1} #logging.info(msg) send_mqtt_message(topic,msg) if s != "rack.home.lan": continue if is_port_open(s,22): ssh = paramiko.SSHClient() ssh.load_system_host_keys() # Add SSH host key automatically if needed. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Connect to router using username/password authentication. logger.info(f"Sync {s}") print(f"Sync {s}") ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) pkey = paramiko.RSAKey.from_private_key_file("/home/jd/.ssh/id_rsa") ssh.connect(s, username=user, look_for_keys=False, allow_agent=False, pkey=pkey) print(cmnd) ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(cmnd) for line in iter(ssh_stdout.readline, ""): logger.info(line) print(line, end="") for line in iter(ssh_stderr.readline, ""): logger.info(line) ssh.close() # Define actions based on payload def handle_payload(payload): try: pl = json.loads(payload) except: pl = payload logging.debug(pl) if "host" in pl: if pl["host"] == 'm-server': logging.info("💡 Starting backup job") backup_job(pl) logging.info(f"💡 Finished backup job") elif pl["host"] == 'nas': logging.info("💡 Starting backup job") backup_job(pl) logging.info(f"💡 Finished backup job") else: logging.error(f"⚠️ Unknown command: {pl}") else: logging.error(f"⚠️ Wrong payload: {pl}") # Callback when connected def on_connect(client, userdata, flags, rc): if rc == 0: logging.info("✅ Connected securely to broker") client.subscribe(TOPIC) logging.info(f"📡 Subscribed to topic: {TOPIC}") else: logging.error(f"❌ Connection failed with code {rc}") # Callback when a message is received def on_message(client, userdata, msg): payload = msg.payload.decode() logging.info(f"📨 Received message: {payload} on topic: {msg.topic}") handle_payload(payload) # MQTT client setup client = mqtt.Client() client.username_pw_set(USERNAME, PASSWORD) client.on_connect = on_connect client.on_message = on_message # Use TLS for encrypted connection if USE_TLS: client.tls_set() # You can customize this with certs if needed # Connect and loop forever client.connect(BROKER, PORT, keepalive=60) client.publish("sectorq/backups/start", "finished", qos=0, retain=True) client.loop_forever()