mirror of
https://gitlab.sectorq.eu/jaydee/omv_backup.git
synced 2025-09-13 12:10:12 +02:00
793 lines
30 KiB
Python
Executable File
793 lines
30 KiB
Python
Executable File
#!/myapps/venv/bin/python3
|
|
import datetime
|
|
import logging
|
|
from paho.mqtt import client as mqtt_client
|
|
import getopt
|
|
import json
|
|
import time
|
|
import socket
|
|
import subprocess
|
|
from subprocess import Popen, PIPE, CalledProcessError
|
|
import sys
|
|
import os
|
|
import re
|
|
import platform
|
|
import requests
|
|
import fnmatch
|
|
import yaml
|
|
import paramiko
|
|
import shutil
|
|
import signal
|
|
import paho.mqtt.client as mqtt
|
|
#import numpy as np
|
|
|
|
def signal_handler(sig, frame):
|
|
print('You pressed Ctrl+C!')
|
|
conn.close()
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
file_path = os.path.realpath(__file__)
|
|
dir_path = os.path.dirname(file_path)
|
|
VERSION="1.0.9"
|
|
# print(file_path)
|
|
# print(dir_path)
|
|
os.chdir(dir_path)
|
|
from wakeonlan import send_magic_packet
|
|
pid = os.getpid()
|
|
def is_port_open(host, port):
|
|
try:
|
|
sock = socket.create_connection((host, port))
|
|
sock.close()
|
|
return True
|
|
except socket.error:
|
|
return False
|
|
servers = ["rpi5.home.lan","nas.home.lan","rack.home.lan","m-server.home.lan"]
|
|
host = platform.node().lower()
|
|
#input(host)
|
|
cmnd = "ps -ef|grep omv_backups.py|grep -v grep |grep -v {}|wc -l".format(pid)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
if int(output) > 0:
|
|
print("Running already!")
|
|
sys.exit()
|
|
|
|
def is_port_open(host, port):
|
|
try:
|
|
sock = socket.create_connection((host, port))
|
|
sock.close()
|
|
return True
|
|
except socket.error:
|
|
return False
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
# doesn't even have to be reachable
|
|
conn = False
|
|
while not conn:
|
|
try:
|
|
s.connect(('192.168.77.1', 1))
|
|
IP = s.getsockname()[0]
|
|
conn = True
|
|
except:
|
|
time.sleep(5)
|
|
|
|
broker = 'mqtt.home.lan'
|
|
port = 1883
|
|
topic_sum = "sectorq/amd/backups"
|
|
mqtt_username = 'jaydee'
|
|
mqtt_password = 'jaydee1'
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:], "hTamftDr:bd:sSOl:", ["command=", "help", "output="])
|
|
except getopt.GetoptError as err:
|
|
#usage()
|
|
sys.exit(2)
|
|
output = None
|
|
# QJ : getopts
|
|
_MODE = "manual"
|
|
_FIRST = _TEST = _RESTORE = _BACKUP = _SYNC = _START = _STOP = _SSH_TEST = False
|
|
_EXECUTE = True
|
|
_DATE = "pick"
|
|
_LOG_LEVEL = ""
|
|
for o, a in opts:
|
|
if o == "-a":
|
|
_MODE = "auto"
|
|
elif o in ("-m", "--manual"):
|
|
_MODE = "manual"
|
|
elif o in ("-l", "--level"):
|
|
_LOG_LEVEL = a.upper()
|
|
elif o in ("-f", "--first"):
|
|
_FIRST = True
|
|
elif o in ("-d", "--date"):
|
|
_DATE = a
|
|
elif o in ("-t", "--test"):
|
|
_TEST = True
|
|
elif o in ("-s", "--sync"):
|
|
_SYNC = True
|
|
elif o in ("-S", "--start"):
|
|
_START = True
|
|
elif o in ("-O", "--stop"):
|
|
_STOP = True
|
|
elif o in ("-r", "--restore"):
|
|
_RESTORE = True
|
|
_APP = a
|
|
print("RESTORE")
|
|
elif o in ("-D", "--dry"):
|
|
_EXECUTE = False
|
|
elif o in ("-T", "--dry"):
|
|
_SSH_TEST = True
|
|
elif o in ("-h", "--help"):
|
|
print(VERSION)
|
|
sys.exit()
|
|
LOG_FILE = "omv_backup.log"
|
|
if _LOG_LEVEL == "DEBUG":
|
|
logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
|
|
logging.debug('using debug loging')
|
|
elif _LOG_LEVEL == "ERROR":
|
|
logging.basicConfig(filename=LOG_FILE, level=logging.ERROR, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
|
|
logging.info('using error loging')
|
|
elif _LOG_LEVEL == "SCAN":
|
|
logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
|
|
logging.info('using error loging')
|
|
else:
|
|
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
|
|
logging.info("script started")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
client_id = "dasdasdasd333"
|
|
# try:
|
|
# client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id)
|
|
# except:
|
|
# client = mqtt_client.Client()
|
|
# client.username_pw_set(mqtt_username, mqtt_password)
|
|
|
|
|
|
|
|
|
|
|
|
logging.info(f"Start OMV Backup")
|
|
BROKER = 'mqtt.home.lan' # e.g., 'mqtt.example.com'
|
|
PORT = 1883 # Typically 8883 for secure MQTT
|
|
TOPIC = 'sectorq/backups/start'
|
|
USERNAME = 'jaydee'
|
|
PASSWORD = 'jaydee1'
|
|
USE_TLS = False # Set to False if not using TLS
|
|
backups = {
|
|
"nas": {
|
|
"login": "admin@nas.home.lan",
|
|
"jobs": {
|
|
"github":
|
|
{"source":"/share/Data/__GITHUB",
|
|
"exclude":"",
|
|
"active": True
|
|
},
|
|
"photo": {
|
|
"source":"/share/Photo/Years",
|
|
"exclude":"",
|
|
"active":True
|
|
}
|
|
}
|
|
},
|
|
"m-server":{
|
|
"login": "root@m-server.home.lan",
|
|
"jobs": {
|
|
"docker_data":{
|
|
"source":"/share/docker_data/",
|
|
"exclude":"",
|
|
"active":True
|
|
},
|
|
"fail2ban":{
|
|
"source":"/etc/fail2ban/",
|
|
"exclude":"",
|
|
"active":True
|
|
}
|
|
}
|
|
},
|
|
"rpi5.home.lan":{
|
|
"docker_data":{
|
|
"source":"/share/docker_data/",
|
|
"exclude":"",
|
|
"active":True
|
|
},
|
|
"fail2ban":{
|
|
"source":"/etc/fail2ban/",
|
|
"exclude":"",
|
|
"active":True
|
|
}
|
|
}
|
|
}
|
|
|
|
BACKUP_FS = "/media/backup/"
|
|
BACKUP_HOST = "amd.home.lan"
|
|
#BACKUP_HOST = "morefine.home.lan"
|
|
|
|
logging.info("Test connection")
|
|
hm = socket.gethostbyaddr(BACKUP_HOST)
|
|
|
|
logging.info(_RESTORE)
|
|
def send_mqtt_message(topic,msg,qos=0,retain=False):
|
|
client2 = mqtt.Client()
|
|
client2.username_pw_set("jaydee", "jaydee1")
|
|
try:
|
|
client2.connect("mqtt.home.lan",1883,60)
|
|
client2.publish(topic, json.dumps(msg), qos=qos, retain=retain)
|
|
client2.disconnect()
|
|
logging.info(f"Message1 sent {topic}, {msg}")
|
|
except ValueError as e:
|
|
logging.error("Failed to send")
|
|
print("Failed to send")
|
|
print(e)
|
|
|
|
if _SYNC:
|
|
containers = ["HomeAssistant","webhub-web-1","heimdall","pihole","mosquitto-mosquitto-1","mailu3-redis-1","mailu3-webmail-1","mailu3-resolver-1","mailu3-antispam-1","mailu3-webdav-1","mailu3-smtp-1","mailu3-oletools-1","mailu3-front-1","mailu3-fetchmail-1","mailu3-imap-1","matter-server","piper-en","openwakeword","whisper-en","auth-worker-1","auth-server-1","auth-authentik_ldap-1","auth-redis-1","auth-postgresql-1","nginx-app-1"]
|
|
|
|
cmnd = f"curl -H 'Authorization: Bearer l4c1j4yd33Du5lo' 192.168.77.238:8094/v1/update"
|
|
logging.info(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
if _START:
|
|
for c in containers:
|
|
cmnd = f"docker start {c}"
|
|
print(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
if _STOP:
|
|
cmnd = "docker ps"
|
|
status, running_containers = subprocess.getstatusoutput(cmnd)
|
|
|
|
logging.info(running_containers)
|
|
for c in running_containers.splitlines():
|
|
print(c.split()[-1])
|
|
if c.split()[-1] == "watchtower-watchtower-1":
|
|
continue
|
|
cmnd = f"docker stop {c.split()[-1]}"
|
|
status, running_containers = subprocess.getstatusoutput(cmnd)
|
|
def restore_job(_APP):
|
|
logging.info("Starting Restore")
|
|
now = datetime.datetime.now()
|
|
STARTTIME = now.strftime("%Y-%m-%d_%H:%M:%S")
|
|
if _APP == "all":
|
|
_DATE = "latest"
|
|
if host == "rpi5.home.lan" or host == "rpi5":
|
|
_APP = ["__backups", "nginx","ha","gitea","gitlab","mailu","bitwarden","esphome","grafana","ingluxdb","kestra","matter-server","mosquitto","octoprint","octoprint2","pihole","unify_block","webhub","homepage","watchtower"]
|
|
else:
|
|
cmnd = "ssh root@amd.home.lan 'ls /mnt/raid/backup/m-server/docker_data/latest'"
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
_APP = output.splitlines()
|
|
logging.info(_APP)
|
|
#input("????")
|
|
else:
|
|
_APP = _APP.split(",")
|
|
PROGRESS = 0
|
|
topic = "sectorq/amd/restore"
|
|
step = 100 / len(_APP)
|
|
for app in _APP:
|
|
#msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":str(round(np.ceil(PROGRESS))) + "%","finished":1,"used_space":1}
|
|
msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":str(round(PROGRESS,0)) + "%","finished":1,"used_space":1}
|
|
logging.info(msg)
|
|
|
|
|
|
send_mqtt_message(topic,msg)
|
|
PROGRESS = PROGRESS + step
|
|
now = datetime.datetime.now()
|
|
DATETIME = now.strftime("%Y-%m-%d_%H-%M-%S")
|
|
BACKUP_HOST = f"root@amd.home.lan"
|
|
BACKUP_DEVICE = "/mnt/raid"
|
|
BACKUP_DIR = f"/backup/{host}"
|
|
|
|
|
|
if _DATE == "pick":
|
|
cmnd = f"ssh root@amd.home.lan 'ls {BACKUP_DEVICE}/backup/m-server/docker_data'"
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
print(output)
|
|
dates = output.splitlines()
|
|
n = 1
|
|
for i in dates:
|
|
print(f"{n} - {i}" )
|
|
n += 1
|
|
|
|
ans = input("Pick a backup to restore : ")
|
|
_DATE = dates[int(ans) - 1]
|
|
|
|
|
|
if app == "fail2ban":
|
|
logging.info("?>?????")
|
|
NEW_BACKUP_DIR = f"/backup/m-server/fail2ban/{_DATE}/"
|
|
SOURCE_DIR = f"/etc/fail2ban"
|
|
else:
|
|
NEW_BACKUP_DIR = f"/backup/m-server/docker_data/{_DATE}/{app}"
|
|
SOURCE_DIR = f"/share/docker_data/"
|
|
if _FIRST:
|
|
BACKUP_PATH="{}/initial".format(BACKUP_DIR)
|
|
else:
|
|
BACKUP_PATH="{}/{}".format(BACKUP_DIR, DATETIME)
|
|
LATEST_LINK="{}/{}".format(BACKUP_DIR,_DATE)
|
|
FULL_BACKUP_LATEST = f"{NEW_BACKUP_DIR}/{_DATE}"
|
|
LATEST_LINK = f"/{host}/{app}/{_DATE}"
|
|
|
|
logging.info("Create backup dir")
|
|
#logging.info(cmnd)
|
|
|
|
|
|
#cmnd = "rsync -av --delete {}/ --link-dest {} --exclude=\".cache\" {}".format(SOURCE_DIR, LATEST_LINK, BACKUP_PATH)
|
|
|
|
|
|
if app == "heimdall":
|
|
logging.info("Stopping docker")
|
|
cmnd = "docker stop heimdall"
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}"
|
|
ans = "y"
|
|
logging.info(cmnd)
|
|
logging.info("Sync files")
|
|
if _TEST:
|
|
ans = input("continue?") or "n"
|
|
if ans == "y" and _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
|
|
entries = ["Home Assistant","Nginx Proxy Manager","Portainer","Roundcube","Authentik","Kestra"]
|
|
for e in entries:
|
|
cmnd = f"sqlite3 /share/docker_data/heimdall/config/www/app.sqlite \"SELECT url FROM items WHERE title = '{e}'\""
|
|
logging.info(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
regex = re.compile(r'[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}')
|
|
contents = re.sub(regex, IP , output)
|
|
cmnd = f"sqlite3 /share/docker_data/heimdall/config/www/app.sqlite \"UPDATE items SET url = '{contents}' WHERE title = '{e}'\""
|
|
logging.info(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
# cmnd = "docker start heimdall"
|
|
# status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
|
|
|
|
|
|
if app == "ha":
|
|
logging.info("Stopping docker")
|
|
cmnd = "docker stop heimdall"
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}"
|
|
ans = "y"
|
|
logging.info(cmnd)
|
|
logging.info("Sync files")
|
|
if _TEST:
|
|
ans = input("continue?") or "n"
|
|
if ans == "y" and _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
logging.info("Start docker")
|
|
# cmnd = "docker start heimdall"
|
|
# status, output = subprocess.getstatusoutput(cmnd)
|
|
elif app == "fail2ban":
|
|
logging.info("Stopping docker")
|
|
cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}"
|
|
ans = "y"
|
|
logging.info(cmnd)
|
|
logging.info("Sync files")
|
|
if _TEST:
|
|
ans = input("continue?") or "n"
|
|
if ans == "y" and _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
logging.info("Start docker")
|
|
# cmnd = "docker start heimdall"
|
|
# status, output = subprocess.getstatusoutput(cmnd)
|
|
elif app == "homepage":
|
|
logging.info("Stopping docker")
|
|
cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}"
|
|
ans = "y"
|
|
logging.info(cmnd)
|
|
|
|
if _TEST:
|
|
ans = input("continue?") or "n"
|
|
if ans == "y" and _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
file = "/share/docker_data/homepage/config/widgets.yaml"
|
|
with open(file, 'r') as stream:
|
|
try:
|
|
loaded = yaml.load(stream, Loader=yaml.FullLoader)
|
|
except yaml.YAMLError as exc:
|
|
logging.info(exc)
|
|
|
|
# Modify the fields from the dict
|
|
#loaded['logo']['icon'] = "/images/morefine2.png"
|
|
logging.info(json.dumps(loaded, indent=2))
|
|
i = 0
|
|
|
|
|
|
for y in loaded:
|
|
logging.info(i)
|
|
logging.info(y)
|
|
|
|
if "logo" in y:
|
|
if host == "rpi5.home.lan" or host == "rpi5":
|
|
loaded[i]['logo']['icon'] = "/images/rpi5.png"
|
|
elif host == "nas.home.lan":
|
|
loaded[i]['logo']['icon'] = "/images/qnap_nas.png"
|
|
elif host == "rack.home.lan":
|
|
loaded[i]['logo']['icon'] = "/images/rack.png"
|
|
else:
|
|
loaded[i]['logo']['icon'] = "/images/morefine2.png"
|
|
i+=1
|
|
|
|
# Save it again
|
|
logging.info(f"writing to file {file}")
|
|
with open(file, 'w') as stream:
|
|
try:
|
|
yaml.dump(loaded, stream, default_flow_style=False)
|
|
except yaml.YAMLError as exc:
|
|
print("failed")
|
|
print(exc)
|
|
|
|
|
|
|
|
logging.info("Start docker")
|
|
# cmnd = "docker start heimdall"
|
|
# status, output = subprocess.getstatusoutput(cmnd)
|
|
elif app == "nginx1":
|
|
logging.info("Stopping docker")
|
|
cmnd = "docker stop nginx-app-1"
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}"
|
|
ans = "y"
|
|
logging.info(cmnd)
|
|
logging.info("Sync files")
|
|
if _TEST:
|
|
ans = input("continue?") or "n"
|
|
if ans == "y" and _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
domains = ["sectorq.eu","gitlab.sectorq.eu","ha.sectorq.eu","mail.sectorq.eu","pw.sectorq.eu","semaphore.sectorq.eu","kestra.sectorq.eu","auth.sectorq.eu"]
|
|
for d in domains:
|
|
cmnd = f'sqlite3 /share/docker_data/nginx/data/database.sqlite "UPDATE proxy_host SET forward_host = \'{IP}\' WHERE domain_names = \'[\\"{d}\\"]\'"'
|
|
logging.info(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
cmnd = 'egrep -l "# bazarr.sectorq.eu|# gitea.sectorq.eu|# jf.sectorq.eu|# kestra.sectorq.eu|# auth.sectorq.eu|# ha.sectorq.eu|# pw.sectorq.eu|# semaphore.sectorq.eu|# sectorq.eu|# gitlab.sectorq.eu|# ha.sectorq.eu" /share/docker_data/nginx/data/nginx/proxy_host/*'
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
logging.info(output.splitlines())
|
|
for file in output.splitlines():
|
|
logging.info(file)
|
|
f = open(file)
|
|
contents = f.read()
|
|
f.close()
|
|
regex = re.compile(r'\n\s+set \$server\s+\"\w+.\w+.\w+.\w+\";')
|
|
contents = re.sub(regex, f'\n set $server \"{IP}\";', contents)
|
|
#print(contents)
|
|
logging.info(regex)
|
|
f = open(file, "w")
|
|
contents = f.write(contents)
|
|
f.close()
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
logging.info("Starting docker")
|
|
# cmnd = "docker start nginx-app-1"
|
|
# status, output = subprocess.getstatusoutput(cmnd)
|
|
else:
|
|
cmnd = f"rsync -avz --delete {BACKUP_HOST}:{BACKUP_DEVICE}{NEW_BACKUP_DIR} {SOURCE_DIR}"
|
|
ans = "y"
|
|
logging.info(cmnd)
|
|
logging.info("Sync files")
|
|
if _TEST:
|
|
ans = input("continue?") or "n"
|
|
if ans == "y" and _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
now = datetime.datetime.now()
|
|
ENDJOB = now.strftime("%Y-%m-%d_%H:%M:%S")
|
|
logging.info("Sending finished status")
|
|
|
|
msg = {"mode":_MODE, "status":"restore","bak_name":"Restore","host":host,"cur_job":app,"start_time":STARTTIME,"end_time":"","progress":100,"finished":ENDJOB,"used_space":1}
|
|
logging.info(msg)
|
|
send_mqtt_message(topic,msg)
|
|
if _MODE == "auto":
|
|
cmnd = "ssh root@amd.home.lan 'systemctl suspend &'"
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
def backup_job(server):
|
|
client2 = mqtt.Client()
|
|
client2.username_pw_set("jaydee", "jaydee1")
|
|
client2.connect("mqtt.home.lan",1883,60)
|
|
logging.info(f'starting backup job')
|
|
|
|
finished = []
|
|
now = datetime.datetime.now()
|
|
STARTTIME = now.strftime("%Y-%m-%d_%H:%M:%S")
|
|
topic = "sectorq/amd/restore"
|
|
msg = {"mode":"restore", "status":"restore","bak_name":"s","host":0,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":0,"used_space":0}
|
|
|
|
client2.publish(topic, json.dumps(msg),qos=0, retain=True)
|
|
#client2.publish(topic, msg)
|
|
topic = "sectorq/amd/backups"
|
|
msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":"","cur_job":"","start_time":STARTTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)}
|
|
client2.publish(topic, json.dumps(msg),qos=0, retain=True)
|
|
|
|
# iterate over files in
|
|
# that directory
|
|
|
|
|
|
host = server
|
|
logging.info("Backup")
|
|
for b in backups[host]["jobs"]:
|
|
|
|
if not backups[host]["jobs"][b]["active"]:
|
|
logging.info("Backup {} is not active!".format(b))
|
|
msg = {"status":"inactive","bak_name":b,"start_time":"inactive","end_time":"inactive","progress":0}
|
|
client2.publish(topic, json.dumps(msg),qos=0, retain=True)
|
|
continue
|
|
|
|
SOURCE_DIR = backups[host]["jobs"][b]["source"]
|
|
now = datetime.datetime.now()
|
|
BACKUP_HOST = backups[host]["login"]
|
|
BACKUP_DEVICE = "/mnt/raid"
|
|
BACKUP_DIR = f"{BACKUP_HOST}:{SOURCE_DIR}"
|
|
BACKUP_ROOT = f"{BACKUP_DEVICE}/backup/{host}/{b}"
|
|
DATETIME = now.strftime("%Y-%m-%d_%H-%M-%S")
|
|
|
|
if _FIRST:
|
|
NEW_BACKUP_DIR = f"{BACKUP_ROOT}/initial"
|
|
else:
|
|
NEW_BACKUP_DIR = f"{BACKUP_ROOT}/{DATETIME}_running"
|
|
|
|
FULL_BACKUP_LATEST = f"{BACKUP_ROOT}/latest"
|
|
|
|
# msg = {"status":"started","bak_name":b,"start_time":DATETIME,"end_time":"in progress", "progress":0}
|
|
msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":host,"cur_job":b,"start_time":STARTTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)}
|
|
|
|
client2.publish(topic, json.dumps(msg),qos=0, retain=True)
|
|
|
|
|
|
|
|
cmnd = "mkdir -p " + NEW_BACKUP_DIR
|
|
logging.info(cmnd)
|
|
|
|
if _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
logging.info(output)
|
|
logging.info(status)
|
|
logging.info("Create backup dir")
|
|
|
|
cmnd = f"ssh {BACKUP_HOST} 'ls {SOURCE_DIR}'"
|
|
logger.debug(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
logger.debug(output)
|
|
apps = output.splitlines()
|
|
c = len(apps)
|
|
print(apps)
|
|
print(len(apps))
|
|
|
|
step = round(100 / c,1)
|
|
progress = 0
|
|
cmd = ['rsync', '-avz', '--delete', BACKUP_DIR, '--link-dest', FULL_BACKUP_LATEST, '--exclude-from=/myapps/exclude.txt', NEW_BACKUP_DIR]
|
|
logging.info(" ".join(cmd))
|
|
process = subprocess.Popen(cmd,
|
|
stdout=subprocess.PIPE)
|
|
|
|
while process.poll() is None:
|
|
line = process.stdout.readline().decode("utf-8").split("/")
|
|
print(line[0])
|
|
if line[0] in apps:
|
|
logging.info(f"Working on app {line[0]}")
|
|
while True:
|
|
if line[0] != apps[0]:
|
|
del apps[0]
|
|
progress = progress + step
|
|
else:
|
|
break
|
|
apps.remove(line[0])
|
|
#print(len(apps))
|
|
topic = "sectorq/amd/backups"
|
|
msg = {"mode":_MODE, "status":"started","bak_name":"complete","host":host,"cur_job":b,"sub":line[0],"start_time":STARTTIME,"end_time":"in progress","progress":str(round(progress)) + "%","finished":",".join(finished)}
|
|
client2.publish(topic, json.dumps(msg),qos=0, retain=False)
|
|
progress = progress + step
|
|
|
|
cmnd = f"rm -rf {FULL_BACKUP_LATEST}"
|
|
|
|
#logging.info(cmnd)
|
|
logging.info("Removing latest link")
|
|
# input("????")
|
|
if _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
if _FIRST:
|
|
cmnd = f"cd {BACKUP_ROOT}; ln -s initial latest"
|
|
else:
|
|
cmnd = f"cd {BACKUP_ROOT}; mv {DATETIME}_running {DATETIME};ln -s {DATETIME} latest"
|
|
logging.info("Creating new latest link")
|
|
#print(cmnd)
|
|
# input("????")
|
|
if _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
#Remove old
|
|
logging.info("Removing old dirs")
|
|
|
|
cmnd = f"ls {BACKUP_ROOT}"
|
|
|
|
if _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
for f in output.splitlines():
|
|
pattern = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}-[0-9]{2}-[0-9]{2}$" # regex pattern: string starts with 'abc'
|
|
# logging.info(f"Checking {f}")
|
|
if re.match(pattern, f):
|
|
# logging.info("Match!")
|
|
dt = datetime.datetime.strptime(f, "%Y-%m-%d_%H-%M-%S")
|
|
epoch_time = int(dt.timestamp())
|
|
|
|
now_epoch = int(datetime.datetime.now().timestamp())
|
|
x = now_epoch - epoch_time
|
|
# logging.info(epoch_time) # Output: 45
|
|
if x > 2592000:
|
|
dir_path = f"{BACKUP_ROOT}/{f}"
|
|
logging.info(f"removing {dir_path}")
|
|
shutil.rmtree(dir_path)
|
|
else:
|
|
print("No match.")
|
|
|
|
cmnd = f"ls {BACKUP_ROOT}|grep _running"
|
|
logging.info(f"removing obsolete dirs")
|
|
if _EXECUTE:
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
for f in output.splitlines():
|
|
dir_path = f"{BACKUP_ROOT}/{f}"
|
|
logging.info(f"removing {dir_path}")
|
|
shutil.rmtree(dir_path)
|
|
|
|
now = datetime.datetime.now()
|
|
ENDTIME = now.strftime("%Y-%m-%d_%H:%M:%S")
|
|
#msg = {"status":"finished","bak_name":b,"start_time":DATETIME,"end_time":ENDTIME,"progress":0}
|
|
finished.append(b)
|
|
msg = {"mode":_MODE, "status":"finished","bak_name":"complete","host":host,"cur_job":b,"start_time":ENDTIME,"end_time":"in progress","progress":0,"finished":",".join(finished)}
|
|
client2.publish(topic, json.dumps(msg),qos=0, retain=True)
|
|
|
|
logging.info("Getting size of FS")
|
|
cmnd = "df -h /mnt/raid|awk '{ print $3 }'|tail -1"
|
|
logging.info(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
used_space = (output.split())[0]
|
|
now = datetime.datetime.now()
|
|
ENDJOB = now.strftime("%Y-%m-%d_%H:%M:%S")
|
|
logging.info("Size : {}".format(used_space))
|
|
logging.info("Sending finished status")
|
|
#msg = {"mode":_MODE,"status":"finished","bak_name":"complete","start_time":STARTTIME,"end_time":ENDJOB,"progress":0,"used_space":used_space}
|
|
msg = {"mode":_MODE, "status":"finished","bak_name":"complete","host":host,"cur_job":b,"start_time":STARTTIME,"end_time":ENDTIME,"progress":0,"finished":",".join(finished),"used_space":used_space}
|
|
logging.info(msg)
|
|
|
|
client2.publish(topic, json.dumps(msg),qos=0, retain=True)
|
|
topic = "sectorq/backups/start"
|
|
logging.info(f"LALA : {topic}")
|
|
client2.publish(topic, "finished",qos=0, retain=True)
|
|
client2.publish(topic, "finished2",qos=0, retain=True)
|
|
client2.disconnect()
|
|
#return "finished"
|
|
|
|
|
|
topic = "sectorq/amd/restore"
|
|
for s in servers:
|
|
logging.info(f"Restoring {s}")
|
|
#if s != "rack.home.lan":
|
|
if s == "m-server.home.lan":
|
|
continue
|
|
elif s == "nas.home.lan":
|
|
user = "admin"
|
|
cmnd = "/share/Data/python/bin/python3 /share/Data/__GITLAB/omv_backup/omv_backup.py -r all"
|
|
else:
|
|
user = "jd"
|
|
cmnd = "sudo /myapps/omv_backup.py -r all"
|
|
msg = {"mode":_MODE, "status":"restore","bak_name":"s","host":s,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":1,"used_space":1}
|
|
#logging.info(msg)
|
|
|
|
send_mqtt_message(topic,msg)
|
|
#continue
|
|
if is_port_open(s,22):
|
|
ssh = paramiko.SSHClient()
|
|
ssh.load_system_host_keys()
|
|
# Add SSH host key automatically if needed.
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
# Connect to router using username/password authentication.
|
|
logger.info(f"Sync {s}")
|
|
print(f"Sync {s}")
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
pkey = paramiko.RSAKey.from_private_key_file("/home/jd/.ssh/id_rsa")
|
|
ssh.connect(s,
|
|
username=user,
|
|
look_for_keys=False,
|
|
allow_agent=False,
|
|
pkey=pkey)
|
|
print(cmnd)
|
|
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(cmnd)
|
|
for line in iter(ssh_stdout.readline, ""):
|
|
logger.info(line)
|
|
print(line, end="")
|
|
for line in iter(ssh_stderr.readline, ""):
|
|
logger.info(line)
|
|
ssh.close()
|
|
try:
|
|
os.remove("/backups/restore")
|
|
except:
|
|
pass
|
|
|
|
return "finished1"
|
|
|
|
|
|
if _RESTORE:
|
|
restore_job(_APP)
|
|
sys.exit()
|
|
if _SSH_TEST:
|
|
user = "root"
|
|
cmnd = "ls -la"
|
|
topic = "sectorq/amd/backups"
|
|
for s in servers:
|
|
# if s == "m-server.home.lan":
|
|
# continue
|
|
# elif s == "nas.home.lan":
|
|
# user = "admin"
|
|
# cmnd = "/share/Data/__GITLAB/omv_backup/venv/bin/python3 /share/Data/__GITLAB/omv_backup/omv_backup.py -r all"
|
|
msg = {"mode":_MODE, "status":"restore","bak_name":"s","host":s,"cur_job":"aaa","start_time":1,"end_time":1,"progress":0,"finished":1,"used_space":1}
|
|
#logging.info(msg)
|
|
|
|
send_mqtt_message(topic,msg)
|
|
if s != "rack.home.lan":
|
|
continue
|
|
if is_port_open(s,22):
|
|
ssh = paramiko.SSHClient()
|
|
ssh.load_system_host_keys()
|
|
# Add SSH host key automatically if needed.
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
# Connect to router using username/password authentication.
|
|
logger.info(f"Sync {s}")
|
|
print(f"Sync {s}")
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
pkey = paramiko.RSAKey.from_private_key_file("/home/jd/.ssh/id_rsa")
|
|
ssh.connect(s,
|
|
username=user,
|
|
look_for_keys=False,
|
|
allow_agent=False,
|
|
pkey=pkey)
|
|
print(cmnd)
|
|
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(cmnd)
|
|
for line in iter(ssh_stdout.readline, ""):
|
|
logger.info(line)
|
|
print(line, end="")
|
|
for line in iter(ssh_stderr.readline, ""):
|
|
logger.info(line)
|
|
ssh.close()
|
|
|
|
# Define actions based on payload
|
|
def handle_payload(payload):
|
|
payload = payload.lower()
|
|
if payload == 'm-server':
|
|
logging.info("💡 Starting backup job")
|
|
backup_job(payload)
|
|
logging.info(f"💡 Finished backup job")
|
|
elif payload == 'nas':
|
|
logging.info("💡 Starting backup job")
|
|
backup_job(payload)
|
|
logging.info(f"💡 Finished backup job")
|
|
else:
|
|
logging.error(f"⚠️ Unknown command: {payload}")
|
|
|
|
# Callback when connected
|
|
def on_connect(client, userdata, flags, rc):
|
|
if rc == 0:
|
|
logging.info("✅ Connected securely to broker")
|
|
client.subscribe(TOPIC)
|
|
logging.info(f"📡 Subscribed to topic: {TOPIC}")
|
|
else:
|
|
logging.error(f"❌ Connection failed with code {rc}")
|
|
|
|
# Callback when a message is received
|
|
def on_message(client, userdata, msg):
|
|
payload = msg.payload.decode()
|
|
logging.info(f"📨 Received message: {payload} on topic: {msg.topic}")
|
|
handle_payload(payload)
|
|
|
|
# MQTT client setup
|
|
client = mqtt.Client()
|
|
client.username_pw_set(USERNAME, PASSWORD)
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
|
|
# Use TLS for encrypted connection
|
|
if USE_TLS:
|
|
client.tls_set() # You can customize this with certs if needed
|
|
|
|
# Connect and loop forever
|
|
client.connect(BROKER, PORT, keepalive=60)
|
|
|
|
|
|
client.publish("sectorq/backups/start", "finished", qos=0, retain=True)
|
|
client.loop_forever() |