From 4cd9cfce2065d1a32297246fc70687256691c494 Mon Sep 17 00:00:00 2001 From: jaydee Date: Tue, 23 Dec 2025 22:30:57 +0100 Subject: [PATCH] build --- main.py | 689 +++++++++ portainer.py | 2030 -------------------------- __init__.py => portainer/__init__.py | 0 port.py => portainer/api.py | 0 4 files changed, 689 insertions(+), 2030 deletions(-) create mode 100755 main.py delete mode 100755 portainer.py rename __init__.py => portainer/__init__.py (100%) rename port.py => portainer/api.py (100%) diff --git a/main.py b/main.py new file mode 100755 index 0000000..bce61d6 --- /dev/null +++ b/main.py @@ -0,0 +1,689 @@ +""" +Portainer API Client module. + +This module provides a wrapper for interacting with the Portainer API +to manage endpoints, stacks, and containers. +""" + +import os +import logging +import signal +import sys +import json +import argparse +import hvac +import time +import base64 +import shutil +import requests +from portainer.api import PortainerApi +from git import Repo +from concurrent.futures import ThreadPoolExecutor +from tabulate import tabulate +from prompt_toolkit import prompt +from prompt_toolkit.completion import WordCompleter +from prompt_toolkit.shortcuts import checkboxlist_dialog +from prompt_toolkit.shortcuts import radiolist_dialog + + +VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://192.168.77.101:8200") +try: + VAULT_TOKEN = os.environ.get("VAULT_TOKEN") + if VAULT_TOKEN is None: + raise KeyError +except KeyError: + VAULT_TOKEN = prompt("Valult root token : ", is_password=True) +os.environ["VAULT_TOKEN"] = VAULT_TOKEN + +client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN) +# Check if connected +if client.is_authenticated(): + print("Connected to Vault") +else: + raise Exception("Failed to authenticate with Vault") +# Specify the mount point of your KV engine + +VERSION = "0.1.50" + +defaults = { + "endpoint_id": "vm01", + "stack": "my_stack", + "deploy_mode": "git", + "autostart": "True", + "stack_mode": "swarm", + "site": "portainer", +} + +cur_config = {} + +def load_config(defaults=defaults): + '''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.''' + if os.path.exists("/myapps/portainer.conf"): + with open("/myapps/portainer.conf", "r") as f: + conf_data = f.read() + for line in conf_data.split("\n"): + if line.startswith("#") or line.strip() == "": + continue + key, value = line.split("=", 1) + os.environ[key.strip()] = value.strip() + cur_config[key.strip()] = value.strip() + else: + print("No /myapps/portainer.conf file found, proceeding with env vars.") + os.makedirs("/myapps", exist_ok=True) + + for field in defaults.keys(): + value_in = os.getenv(f"PORTAINER_{field.upper()}") + if value_in is not None: + os.environ[f"PORTAINER_{field.upper()}"] = value_in + cur_config[f"PORTAINER_{field.upper()}"] = value_in + else: + os.environ[f"PORTAINER_{field.upper()}"] = defaults[field] + cur_config[f"PORTAINER_{field.upper()}"] = defaults[field] + + conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) + # print("Using the following configuration:") + with open("/myapps/portainer.conf", "w") as f: + f.write(conf_data) + print("Configuration written to /myapps/portainer.conf") + return cur_config + +a = load_config(defaults) + +# ENV_VARS = [ +# "PORTAINER_URL", +# "PORTAINER_SITE", +# "PORTAINER_ENDPOINT_ID", +# "PORTAINER_STACK", +# "PORTAINER_DEPLOY_MODE", +# "PORTAINER_STACK_MODE", +# ] + + +def update_configs(cur_config): + '''Update defaults from environment variables if set.''' + conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) + # print("Using the following configuration:") + # print(conf_data) + with open("/myapps/portainer.conf", "w") as f: + f.write(conf_data) + print("Configuration written to /myapps/portainer.conf") + +parser = argparse.ArgumentParser( + description=f"""\ +Portainer helper - use env vars or pass credentials." +version: {VERSION} + """, + formatter_class=argparse.RawTextHelpFormatter, +) +parser.add_argument( + "--base", + "-b", + default=os.getenv("PORTAINER_URL", "https://portainer.example.com"), + help="Base URL for Portainer (ENV: PORTAINER_URL)", +) +parser.add_argument("--site", "-t", type=str, default=None, help="Site") +parser.add_argument( + "--endpoint-id", + "-e", + type=str, + default=None, + help="Endpoint ID to limit stack operations", +) +parser.add_argument( + "--service-id", + "-i", + type=str, + default=None, + help="Service ID to limit service operations", +) +parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations") +parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform") +parser.add_argument( + "--autostart", "-Z", action="store_true", help="Auto-start created stacks" +) +parser.add_argument("--update", "-u", action="store_true", help="Update service if it exists") +parser.add_argument("--debug", "-D", action="store_true") +parser.add_argument("--launcher", "-L", action="store_true") +parser.add_argument("--gpu", "-g", action="store_true") +parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds") +parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode") +parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode") + +args = parser.parse_args() +print("Running version:", VERSION) +print("Environment:", args.site) +args.client = client +if args.site is not None: + cur_config["PORTAINER_SITE"] = args.site +if args.endpoint_id is not None: + cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id +if args.stack is not None: + cur_config["PORTAINER_STACK"] = args.stack +if args.deploy_mode is not None: + cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode +if args.stack_mode is not None: + cur_config["PORTAINER_STACK_MODE"] = args.stack_mode + +update_configs(cur_config) + +if args.debug: + input(cur_config) + +_LOG_LEVEL = "DEBUG" +LOG_FILE = "/tmp/portainer.log" +if _LOG_LEVEL == "DEBUG": + logging.basicConfig( + filename=LOG_FILE, + level=logging.DEBUG, + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + ) + logging.debug("using debug logging") +elif _LOG_LEVEL == "ERROR": + logging.basicConfig( + filename=LOG_FILE, + level=logging.ERROR, + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + ) + logging.info("using error logging") +elif _LOG_LEVEL == "SCAN": + logging.basicConfig( + filename=LOG_FILE, + level=logging.DEBUG, + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + ) + logging.info("using scan logging") +else: + logging.basicConfig( + filename=LOG_FILE, + level=logging.INFO, + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + ) +logging.info("script started") + +logger = logging.getLogger(__name__) + + +def wl(msg): + """Write log message if debug is enabled.""" + if args.debug: + print(msg) + + +def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None): + """ + fields = [("arg_name", "Prompt text")] + """ + + longest = 0 + for field, text in fields: + a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")" + if len(a) > longest: + longest = len(a) + + for field, text in fields: + # print(field) + value_in = getattr(args_in, field) + default = defaults_in.get(f"PORTAINER_{field}".upper()) + cur_site = defaults_in.get("PORTAINER_SITE".upper()) + cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper()) + + # print(value_in) + if value_in is None: + if default is not None: + prompt_text = f"{text} (default={default}) : " + # value_in = input(prompt) or default + if field == "site": + commands = ["portainer", "port"] + elif field == "deploy_mode": + commands = ["git", "upload"] + elif field == "stack_mode": + commands = ["swarm", "compose"] + elif field == "endpoint_id": + commands = por.endpoints_names + elif field == "stack": + if args.action == "create_stack": + # input(json.dumps(stacks, indent=2)) + commands = [ + 'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana', + 'hashicorp', 'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3', + 'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx', + 'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry', + 'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress', + 'wud', 'zabbix-server'] + try: + print(por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys()) + for s in por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys(): + + #print(s) + commands.remove(s) + except KeyError: + print("No stacks found for endpoint", defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]) + + else: + commands = [] + if por._debug: + input(por.stacks_all) + # print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]) + try: + for s in por.stacks_all[ + defaults_in[f"PORTAINER_ENDPOINT_ID".upper()] + ]["by_name"].keys(): + commands.append(s) + except KeyError: + print( + "No stacks found for endpoint", + defaults_in[f"PORTAINER_ENDPOINT_ID".upper()], + ) + sys.exit(1) + else: + commands = [] + completer = WordCompleter( + commands, ignore_case=True, match_middle=False + ) + try: + if field == "stack": + commands.sort() + commands_tuples = [(cmd, cmd) for cmd in commands] + commands_tuples.insert(0, ("__ALL__", "[Select ALL]")) + value_in = checkboxlist_dialog( + title="Select Services", + text="Choose one or more services:", + values=commands_tuples, + ).run() + + if value_in is None: + print("Cancelled.") + sys.exit(0) + elif "__ALL__" in value_in: + # User selected "Select ALL" + value_in = commands # all real commands + + + value_in.sort() + + if "pihole" in value_in: + if action == "delete_stack": + value_in.remove("pihole") + value_in.append("pihole") + else: + value_in.remove("pihole") + value_in.insert(0, "pihole") + print(" >> Stacks :", ",".join(value_in)) + else: + value_in = ( + prompt( + f" >> {prompt_text}", + completer=completer, + placeholder=default, + ) + or default + ) + except KeyboardInterrupt: + print("\n^C received — exiting cleanly.") + sys.exit(0) + + # value_in = input_with_default(prompt_text, default, longest+2) + + else: + # value_in = input(f"{text}: ") + commands = ["start", "stop", "status", "restart", "reload", "exit"] + completer = WordCompleter(commands, ignore_case=True) + try: + value_in = ( + prompt( + f" >> {text} {default}", + completer=completer, + placeholder=default, + ) + or default + ) + except KeyboardInterrupt: + print("\n^C received — exiting cleanly.") + sys.exit(0) + + # value_in = input_with_default(text, default, longest+2) + value_in.sort() + if por._debug: + print("Value entered:", value_in) + defaults_in[f"PORTAINER_{field}".upper()] = value_in + setattr(args, field, value_in) + + if field == "site" and value_in != cur_site: + por.get_site(value_in) + if value_in == "portainer": + defaults_in["PORTAINER_ENDPOINT_ID"] = "m-s" + elif value_in == "port": + defaults_in["PORTAINER_ENDPOINT_ID"] = "vm01" + if field == "stack" and value_in != cur_site: + os.environ[field] = ",".join(value_in) + else: + os.environ[field] = value_in + if por._debug: + print(f"{defaults_in} {field} {value_in}") + if field == "endpoint_id" and value_in != defaults_in.get( + "PORTAINER_ENDPOINT_ID".upper() + ): + print("refreshing environment") + por.get_endpoints() + with open("/myapps/portainer.conf", "w") as f: + for k in defaults_in.keys(): + f.write(f"{k}={defaults_in[k]}\n") + + return args + + +if __name__ == "__main__": + # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. + + def signal_handler(sig, frame): + logger.warning("Killed manually %s, %s", sig, frame) + print("\nTerminated by user") + print("\033[?25h", end="") + sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) + os.system("cls" if os.name == "nt" else "clear") + if args.action is None: + actions = [ + ("create_stack","create_stack"), + ("delete_stack","delete_stack"), + ("stop_stack","stop_stack"), + ("start_stack","start_stack"), + ("restart_service","restart_service"), + ("update_service","update_service"), + ("update_containers","update_containers"), + ("list_stacks","list_stacks"), + ("update_stack","update_stack"), + ("secrets","secrets"), + ("print_all_data","print_all_data"), + ("list_endpoints","list_endpoints"), + ("list_containers","list_containers"), + ("stop_containers","stop_containers"), + ("start_containers","start_containers"), + ("refresh_environment","refresh_environment"), + ("refresh_status","refresh_status"), + ("update_status","update_status"), + ] + + selected_action = radiolist_dialog( + title="Select one service", + text="Choose a service:", + values=actions + ).run() + + + + print("Selected:", selected_action) + + + + + # print("Possible actions: \n") + # i = 1 + # for a in actions: + # print(f" > {i:>2}. {a}") + # i += 1 + # ans = input("\nSelect action to perform: ") + args.action = selected_action + + + os.system("cls" if os.name == "nt" else "clear") + # Example: list endpoints + por = PortainerApi(cur_config["PORTAINER_SITE"], args) + por.set_defaults(cur_config) + if args.debug: + por._debug = True + + if args.action == "secrets": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ], + ) + secrets = { + "gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb", + "influxdb2-admin-token": "l4c1j4yd33Du5lo", + "ha_influxdb2_admin_token": "l4c1j4yd33Du5lo", + "wordpress_db_password": "wordpress", + "wordpress_root_db_password": "wordpress", + } + for key, value in secrets.items(): + res = por.create_secret(key, value, args.endpoint_id, args.timeout) + print(res) + sys.exit() + + if args.action == "delete_stack": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ("stack", "Stack name or ID"), + ], + action="delete_stack", + ) + + input( + f"\nDelete stack {','.join(args.stack)} on endpoint {args.endpoint_id}. Press ENTER to continue..." + ) + por.delete_stack( + args.endpoint_id, + args.stack, + ) + sys.exit() + + if args.action == "create_stack": + por.action = "create_stack" + #print(cur_config) + #print(args) + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ("stack", "Stack name or ID"), + ("stack_mode", "Stack mode (swarm or compose)"), + ("deploy_mode", "Deploy mode (git or upload)"), + ], + por, + ) + por.create_stack( + args.endpoint_id, + args.stack, + args.deploy_mode, + args.autostart, + args.stack_mode, + ) + sys.exit() + + if args.action == "stop_stack": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ("stack", "Stack name or ID"), + ], + ) + + por.stop_stack(args.stack, args.endpoint_id) + sys.exit() + + if args.action == "start_stack": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ("stack", "Stack name or ID"), + ], + ) + por.start_stack(args.stack, args.endpoint_id) + sys.exit() + + if args.action == "restart_service": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID") + ], + ) + por.restart_service(args.endpoint_id, "lala") + sys.exit() + + if args.action == "update_service": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID") + ], + ) + por.update_service() + if args.launcher: + input("\nPress ENTER to continue...") + sys.exit() + + if args.action == "update_containers": + + + + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID") + ], + ) + por.update_containers() + sys.exit() + + if args.action == "list_stacks": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ], + ) + por.print_stacks(args) + if args.launcher: + input("Press ENTER to continue...") + # print(json.dumps(por.all_data, indent=2)) + sys.exit() + + if args.action == "list_containers": + print("Getting containers") + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ], + ) + print("\n".join(por.get_containers())) + if args.launcher: + input("\nPress ENTER to continue...") + sys.exit() + + if args.action == "update_stack": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID") + ], + ) + + por.update_stack(args) + if args.launcher: + input("\nPress ENTER to continue...") + sys.exit() + + if args.action == "print_all_data": + print(json.dumps(por.all_data, indent=2)) + if args.launcher: + input("\nPress ENTER to continue...") + sys.exit() + + if args.action == "update_status": + por.update_status(args.endpoint_id, args.stack) + sys.exit() + + if args.action == "list_endpoints": + eps = por.get_endpoints(args) + export_data = [] + for i in eps["by_id"]: + export_data.append([i, eps["by_id"][i]]) + headers = ["EndpointId", "Name"] + print(tabulate(export_data, headers=headers, tablefmt="github")) + if args.launcher: + input("\nPress ENTER to continue...") + sys.exit() + + if args.action == "stop_containers": + # TODO: does not work + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ], + ) + if por.all_data["endpoints_status"][args.endpoint_id] != 1: + print(f"Endpoint {por.get_endpoint_name(args.endpoint_id)} is offline") + sys.exit() + print(f"Stopping containers on {por.get_endpoint_name(args.endpoint_id)}") + cont = [] + for c in por.all_data["containers"][args.endpoint_id]: + if args.stack in (c, "all"): + cont += por.all_data["containers"][args.endpoint_id][c] + por.stop_containers(args.endpoint_id, cont) + sys.exit() + + if args.action == "start_containers": + print("Starting containers") + cont = [] + # input(json.dumps(por.all_data, indent=2)) + for c in por.all_data["containers"][args.endpoint_id]: + if args.stack in (c, "all"): + cont += por.all_data["containers"][args.endpoint_id][c] + por.start_containers(args.endpoint_id, cont) + sys.exit() + if args.action == "start_containers": + print("Starting containers") + cont = [] + # input(json.dumps(por.all_data,indent=2)) + for c in por.all_data["containers"][args.endpoint_id]: + if args.stack in (c, "all"): + cont += por.all_data["containers"][args.endpoint_id][c] + por.start_containers(args.endpoint_id, cont) + sys.exit() + if args.action == "refresh_environment": + cont = por.refresh() + sys.exit() + + if args.action == "refresh_status": + if args.stack == "all": + print("Stopping all stacks...") + stcks = por.get_stacks(endpoint_id=args.endpoint_id) + else: + por.refresh_status(args.stack_id) diff --git a/portainer.py b/portainer.py deleted file mode 100755 index 2b56aff..0000000 --- a/portainer.py +++ /dev/null @@ -1,2030 +0,0 @@ -""" -Portainer API Client module. - -This module provides a wrapper for interacting with the Portainer API -to manage endpoints, stacks, and containers. -""" - -import os -import logging -import signal -import sys -import json -import argparse -import hvac -import time -import base64 -import shutil -import requests -from git import Repo -from concurrent.futures import ThreadPoolExecutor -from tabulate import tabulate -from prompt_toolkit import prompt -from prompt_toolkit.completion import WordCompleter -from prompt_toolkit.shortcuts import checkboxlist_dialog -from prompt_toolkit.shortcuts import radiolist_dialog - - -class PortainerApi: - """ - Simple wrapper around the module-level Portainer helper functions. - Instantiate with base_url and optional token/timeout and call methods - to perform API operations. - """ - - def __init__(self, site, args=None, timeout=120): - self.base_url = None - self.token = None - self.args = args - self.action = None - self._debug = False - self.timeout = timeout - self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" - self.stack_name = None - self.stacks_all = {} - self.stack_id = None - self.stack_ids = [] - self.endpoint_name = None - self.endpoint_id = args.endpoint_id - - # self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git" - self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" - self.repo_dir = "/tmp/docker-compose" - self.basic_stacks = [ - "pihole", - "nginx", - "mosquitto", - "webhub", - "authentik", - "bitwarden", - "mailu3", - "home-assistant", - "homepage", - ] - self.nas_stacks = self.basic_stacks + [ - "gitlab", - "bookstack", - "dockermon", - "gitea", - "grafana", - "immich", - "jupyter", - "kestra", - "mealie", - ] - self.m_server_stacks = self.basic_stacks + [ - "immich", - "zabbix-server", - "gitea", - "unifibrowser", - "mediacenter", - "watchtower", - "wazuh", - "octoprint", - "motioneye", - "kestra", - "bookstack", - "wud", - "uptime-kuma", - "registry", - "regsync", - "dockermon", - "grafana", - "nextcloud", - "semaphore", - "node-red", - "test", - "jupyter", - "paperless", - "mealie", - "n8n", - "ollama", - "rancher", - ] - self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"] - self.rack_stacks = self.basic_stacks + [ - "gitlab", - "bookstack", - "dockermon", - "gitea", - "grafana", - "immich", - "jupyter", - "kestra", - "mealie", - ] - self.log_mode = False - self.hw_mode = False - self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}, "services":{}} - self.get_site(site) - self.get_endpoints() - self.get_stacks() - self.refresh_in_containers() - - def set_defaults(self, config): - '''Set default configuration from provided config dictionary.''' - self.cur_config = config - - def get_site(self, site): - if site == "portainer": - self.base_url = os.getenv( - "PORTAINER_URL", "https://portainer.sectorq.eu/api" - ) - # self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" - token_path = "portainer/token" - self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value'] - elif site == "port": - self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api") - token_path = "port/token" - self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value'] - else: - self.base_url = os.getenv( - "PORTAINER_URL", "https://portainer.sectorq.eu/api" - ) - self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" - self.get_endpoints() - self.get_stacks() - - def _is_number(self, s): - """Check if the input string is a number.""" - try: - float(s) - return True - except ValueError: - return False - - def gotify_message(self, message): - payload = { - "title": "Updates in Portainer", - "message": message, - "priority": 5 - } - '''Send a notification message via Gotify.''' - response = requests.post( - "https://gotify.sectorq.eu/message", - data=payload, - headers={"X-Gotify-Key": "ASn_fIAd5OVjm8c"} - ) - logger.debug(response.text) - # print("Status:", response.status_code) - # print("Response:", response.text) - pass - - def _api_get(self, path, timeout=120): - url = f"{self.base_url.rstrip('/')}{path}" - headers = {"X-API-Key": f"{self.token}"} - resp = requests.get(url, headers=headers, timeout=timeout) - if resp.status_code != 200: - return resp.status_code - print(f"Error: {resp.status_code} - {resp.text}") - # resp.raise_for_status() - return resp.json() - - def _api_post(self, path, json="", timeout=120): - url = f"{self.base_url.rstrip('/')}{path}" - headers = {"X-API-Key": f"{self.token}"} - # print(url) - # print(json) - resp = requests.post(url, headers=headers, json=json, timeout=timeout) - return resp.text - - def _api_put(self, path, json="", timeout=120): - url = f"{self.base_url.rstrip('/')}{path}" - headers = {"X-API-Key": f"{self.token}"} - # print(url) - # print(json) - resp = requests.put(url, headers=headers, json=json, timeout=timeout) - return resp.text - - def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120): - # input("API POST2 called. Press Enter to continue.") - """Example authenticated GET request to Portainer API.""" - url = f"{self.base_url.rstrip('/')}{path}" - headers = {"X-API-Key": f"{self.token}"} - data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)} - # print(data) - resp = requests.post( - url, headers=headers, files=file, data=data, timeout=timeout - ) - resp.raise_for_status() - return resp.json() - - def _api_post_no_body(self, path, timeout=120): - """Example authenticated GET request to Portainer API.""" - url = f"{self.base_url.rstrip('/')}{path}" - # print(url) - headers = {"X-API-Key": f"{self.token}"} - resp = requests.post(url, headers=headers, timeout=timeout) - return resp.text - - def _api_delete(self, path, timeout=120): - """Example authenticated DELETE request to Portainer API.""" - url = f"{self.base_url.rstrip('/')}{path}" - headers = {"X-API-Key": f"{self.token}"} - resp = requests.delete(url, headers=headers, timeout=timeout) - # print(resp) - resp.raise_for_status() - # print(resp.status_code) - return resp.status_code - - def refresh(self): - '''Refresh all data from Portainer.''' - self.get_endpoints() - self.get_stacks(self) - self.get_containers(self) - return True - - def get_stacks(self, endpoint_id="all", timeout=20): - '''Get a list of stacks for a specific endpoint or all endpoints.''' - if endpoint_id != "all": - endpoint_id = self.get_endpoint_id() - path = "/stacks" - stcks = [] - stacks = self._api_get(path, timeout=timeout) - self.stacks_all = {} - fail_endponts = [20, 39, 41] - # print(json.dumps(stacks,indent=2)) - webhooks = {} - for s in stacks: - # print(type(s["AutoUpdate"]) ) - # input(s) - if s["EndpointId"] in fail_endponts: - continue - if not s["EndpointId"] in webhooks: - try: - webhooks[s["EndpointId"]] = {"webhook": {}} - webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}} - except Exception as e: - logger.debug( - f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}" - ) - if not s["EndpointId"] in self.stacks_all: - self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}} - self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = { - "by_id": {}, - "by_name": {}, - } - self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"] - self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][ - s["Id"] - ] = s["Name"] - - self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"] - self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][ - s["Name"] - ] = s["Id"] - # print(s) - - if "AutoUpdate" in s and s["AutoUpdate"] is not None: - if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]: - # print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook']) - # print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW") - webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"] - webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ - "AutoUpdate" - ]["Webhook"] - elif s["AutoUpdate"]["Webhook"] != "": - webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"] - webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ - "Webhook" - ] - - # print(self.stacks_all) - if s["EndpointId"] == endpoint_id or endpoint_id == "all": - stcks.append(s) - # print(stcks) - if stcks is None: - return [] - self.stacks = stacks - self.all_data["stacks"] = self.stacks_all - self.all_data["webhooks"] = webhooks - # input(json.dumps(self.stacks_all,indent=2)) - return stcks - - def get_services(self, endpoint, timeout=30): - '''Get a list of services for a specific stack on an endpoint.''' - # print(json.dumps(self.all_data,indent=2)) - path = f"/endpoints/{self.get_endpoint_id()}/docker/services" - # print(path) - # path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}' - services = self._api_get(path, timeout=timeout) - return services - - def update_status(self, endpoint, stack): - '''Get the update status of a specific stack on an endpoint.''' - path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true" - # input(path) - stats = self._api_get(path) - print(stats) - - def get_endpoint_id(self): - '''Get endpoint ID from either ID or name input.''' - # input(self.args.endpoint_id) - if self._is_number(self.args.endpoint_id): - self.endpoint_id = self.args.endpoint_id - self.endpoint_name = self.endpoints["by_id"][self.args.endpoint_id] - return self.args.endpoint_id - else: - self.endpoint_name = self.args.endpoint_id - self.endpoint_id = self.endpoints["by_name"][self.args.endpoint_id] - return self.endpoints["by_name"][self.args.endpoint_id] - - def get_endpoint_name(self, endpoint): - '''Get endpoint name from either ID or name input.''' - if self._is_number(endpoint): - self.endpoint_id = endpoint - self.endpoint_name = self.all_data["endpoints"]["by_id"][endpoint] - return self.all_data["endpoints"]["by_id"][endpoint] - else: - self.endpoint_name = endpoint - self.endpoint_id = self.all_data["endpoints"]["by_name"][endpoint] - return endpoint - - def refresh_in_containers(self): - '''Get a list of containers for a specific endpoint and stack.''' - # print(json.dumps(self.all_data,indent=2)) - # print(endpoint) - # print(stack) - cont = [] - data = {} - - eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()] - #input(eps) - for endpoint in eps: - if self.all_data["endpoints_status"][endpoint] != 1: - print("Endpoint down") - # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") - continue - path = ( - f"/endpoints/{endpoint}/docker/containers/json?all=1" - ) - logging.info(f"request : {path}") - try: - containers = self._api_get(path) - #input(json.dumps(containers, indent=2)) - except Exception as e: - print(f"failed to get containers from {path}: {e}") - continue - contr = [] - try: - for c in containers: - #input(c) - cont.append([c["Names"][0].replace("/", ""),c["Id"], c['Image']]) - contr.append([c["Names"][0].replace("/", ""), c["Id"], c['Image']]) - if self.all_data["endpoints"]["by_id"][endpoint] in data: - data[self.all_data["endpoints"]["by_id"][endpoint]] = contr - data[endpoint] = contr - else: - data[self.all_data["endpoints"]["by_id"][endpoint]] = contr - data[endpoint] = contr - except Exception as e: - logger.debug( - f"Exception while getting containers for stack {e} ", - f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}", - ) - self.all_data["containers"] = data - - #print(cont) - return cont - - def get_containers(self): - '''Get a list of containers for a specific endpoint and stack.''' - # print(json.dumps(self.all_data,indent=2)) - # print(endpoint) - # print(stack) - cont = [] - data = {} - if self.args.endpoint_id == "all": - eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()] - else: - - eps = [self.get_endpoint_id()] - - for endpoint in eps: - #print(self.args.stack) - if self.args.stack in ["all", None]: - # input([id for id in self.all_data["stacks"][endpoint]['by_id'].keys()]) - for e in [id for id in self.all_data["stacks"][endpoint]['by_name'].keys()]: - #input(e) - # if s not in self.all_data["stacks"]: - # continue - #input(self.all_data) - if self.all_data["endpoints_status"][endpoint] != 1: - # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") - continue - # input(self.all_data["stacks"][endpoint]["by_name"]) - - #input(e) - path = ( - f"/endpoints/{endpoint}/docker/containers/json" - f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}' - ) - logging.info(f"request : {path}") - try: - containers = self._api_get(path) - #input(containers) - except Exception as e: - print(f"failed to get containers from {path}: {e}") - continue - contr = [] - try: - for c in containers: - # input(c) - cont.append(c["Names"][0].replace("/", "")) - contr.append(c["Names"][0].replace("/", "")) - if self.all_data["endpoints"]["by_id"][endpoint] in data: - data[self.all_data["endpoints"]["by_id"][endpoint]][e] = contr - else: - data[self.all_data["endpoints"]["by_id"][endpoint]] = { - e: contr - } - except Exception as e: - logger.debug( - f"Exception while getting containers for stack {e} ", - f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}", - ) - - self.all_data["containers"] = data - - #print(cont) - return cont - - def stop_containers(self, endpoint, containers, timeout=130): - '''Stop containers on an endpoint.''' - if self.all_data["endpoints_status"][endpoint] != 1: - print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline") - ep_id = self.endpoints["by_name"][endpoint] - - def stop(c): - print(f" > Stopping {c}") - self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop") - # print(f"✔") - - with ThreadPoolExecutor(max_workers=10) as exe: - exe.map(stop, containers) - # for c in containers: - # print(f" > Stopping {c}") - # self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop") - # return 0 - - def start_containers(self, endpoint, containers, timeout=130): - '''Start containers on an endpoint.''' - ep_id = self.endpoints["by_name"][endpoint] - - def stop(c): - print(f" > Starting {c}") - self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start") - - with ThreadPoolExecutor(max_workers=10) as exe: - exe.map(stop, containers) - - def update_stack(self, args): - '''Update one stack or all stacks on an endpoint.''' - #print("Updating stacks") - stacks = self.get_stacks(endpoint_id=args.endpoint_id) - stacks_tuples = [] - - for s in stacks: - #print(s) - try: - stacks_tuples.append((s['AutoUpdate']['Webhook'],s['Name'])) - # print(s['Name'], " : ", s['AutoUpdate']['Webhook']) - except: - stacks_tuples.append((s['Webhook'],s['Name'])) - # print(s['Name'], " : ", s['Webhook']) - stacks_dict = dict(stacks_tuples) - print(stacks_dict) - #input(stacks_tuples) - # stacks_tuples = [(s['AutoUpdate']['Webhook'], s['Name']) for s in stacks if "Webhook" in s['AutoUpdate'] ] - - def update(c): - print(f" > Updating {c[1]} ") - ans = self._api_post_no_body(f"/stacks/webhooks/{c[0]}") - logger.debug( - f"Update response for stack {c[0]} on endpoint {ans}" - ) - # input(stacks_tuples) - if args.debug: - input(args) - stacks_tuples = sorted(stacks_tuples, key=lambda x: x[1]) - stack_dict = dict(stacks_tuples) - # input(service_tuples) - if self.args.service_id is None: - #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] - stacks_tuples.insert(0, ("__ALL__", "[Select ALL]")) - stack_ids = checkboxlist_dialog( - title="Select one stack to update", - text="Choose a service:", - values=stacks_tuples - ).run() - stcs = [] - input(stack_ids) - - if args.stack == "all": - for s in stack_dict: - stcs.append([s, stack_dict[s]]) - else: - for s in stack_dict: - if s in stack_ids: - stcs.append([s, stack_dict[s]]) - - print(stcs) - with ThreadPoolExecutor(max_workers=10) as exe: - list(exe.map(update, stcs)) - - input('UPDATED') - if not args.autostart: - time.sleep(120) - cont = [] - for c in self.all_data["containers"][args.endpoint_id]: - if args.stack == c or args.stack == "all": - cont += self.all_data["containers"][args.endpoint_id][c] - self.stop_containers(args.endpoint_id, cont) - - def get_endpoints(self, timeout=10): - '''Get a list of all endpoints.''' - endpoints = self._api_get("/endpoints") - eps = {"by_id": {}, "by_name": {}} - eps_stats = {} - for ep in endpoints: - eps["by_id"][ep["Id"]] = ep["Name"] - eps["by_name"][ep["Name"]] = ep["Id"] - eps_stats[ep["Id"]] = ep["Status"] - eps_stats[ep["Name"]] = ep["Status"] - self.endpoints = eps - self.endpoints_names = list(eps["by_name"]) - self.all_data["endpoints"] = eps - self.all_data["endpoints_status"] = eps_stats - # input(eps_stats) - # input(eps) - return eps - - def get_endpoint(self, endpoint_id=None, timeout=30): - '''Get endpoint ID and name from either ID or name input.''' - self.get_endpoints() - # print(self.endpoints) - if self._is_number(endpoint_id): - self.endpoint_name = self.endpoints["by_id"][endpoint_id] - self.endpoint_id = endpoint_id - else: - self.endpoint_name = endpoint_id - self.endpoint_id = self.endpoints["by_name"][endpoint_id] - return self.endpoint_id - - def get_swarm_id(self, endpoint): - '''Get the swarm ID for a specific endpoint.''' - ep_id = self.endpoints["by_name"][endpoint] - path = f"/endpoints/{ep_id}/docker/info" - stats = self._api_get(path) - return stats["Swarm"]["Cluster"]["ID"] - - def get_stack(self, stack=None, endpoint_id=None, timeout=None): - self.get_stacks(endpoint_id) - if not self._is_number(endpoint_id): - endpoint_id = int(self.endpoints["by_name"][endpoint_id]) - self.stack_id = [] - if stack == "all": - for s in self.stacks: - # print(s) - if endpoint_id == s.get("EndpointId"): - self.stack_ids.append(s.get("Id")) - return self.stack_ids - else: - for s in self.stacks: - # print(s) - match_by_id = ( - stack is not None - and s.get("Id") == stack - and endpoint_id == s.get("EndpointId") - ) - - match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int( - s.get("EndpointId") - ) # Ensure types match for comparison - - if match_by_id or match_by_name: - # if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) - # or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")): - self.stack_id = s.get("Id") - self.stack_name = s.get("Name") - self.stack_ids.append(s.get("Id")) - return s - RED = "\033[91m" - RESET = "\033[0m" - print(ValueError(f"{RED}✗{RESET} >> Stack not found: {stack}")) - return 1 - - def create_stack( - self, - endpoint, - stacks=None, - mode="git", - autostart=False, - stack_mode="swarm", - ): - for stack in stacks: - if stack_mode == "swarm": - swarm_id = self.get_swarm_id(endpoint) - p = "swarm" - env_path = f"{self.repo_dir}/__swarm/{stack}/.env" - else: - p = "standalone" - env_path = f"{self.repo_dir}/{stack}/.env" - # input(swarm_id) - self.endpoint_id = self.get_endpoint_id() - if os.path.exists(self.repo_dir): - shutil.rmtree(self.repo_dir) - else: - print(f"Folder '{self.repo_dir}' does not exist.") - Repo.clone_from(self.git_url, self.repo_dir) - if mode == "git": - path = f"/stacks/create/{p}/repository" - # print(p) - if self.endpoint_id is not None: - path += f"?endpointId={self.endpoint_id}" - - if stack == "all": - if self.endpoint_name == "rack": - stacks = self.rack_stacks - elif self.endpoint_name == "m-server": - stacks = self.m_server_stacks - elif self.endpoint_name == "rpi5": - stacks = self.rpi5_stacks - elif self.endpoint_name == "nas": - stacks = self.nas_stacks - else: - stacks = [stack] - # print(json.dumps(self.stacks_all, indent=2)) - # input(json.dumps(self.stacks_all, indent=2)) - for stack in stacks: - if self.endpoint_id in self.stacks_all: - - # Check if the stack exists by ID or name - stack_check = ( - stack in self.stacks_all[self.endpoint_id]["by_id"] - or stack in self.stacks_all[self.endpoint_id]["by_name"] - ) - if stack_check: - GREEN = "\033[92m" - RESET = "\033[0m" - print(f"{GREEN}✓{RESET} >> Stack {stack} already exist") - continue - print(f"Working on {stack} , stack mode: {stack_mode}") - - envs = [] - if os.path.exists(f"{env_path}"): - f = open(f"{env_path}", "r") - env_vars = f.read().splitlines() - for ev in env_vars: - if ev.startswith("#") or ev.strip() == "": - continue - if "=" in ev: - name, value = ev.split("=", 1) - envs.append({"name": name, "value": value}) - f.close() - # wl(envs) - for e in envs: - # print(f"Env: {e['name']} = {e['value']}") - HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] - if e["name"] == "RESTART" and self.endpoint_name == "m-server": - e["value"] = "always" - if e["name"] in HWS: - # print("Found HW_MODE env var.") - if self.hw_mode: - e["value"] = "hw" - else: - e["value"] = "cpu" - if e["name"] == "LOGGING": - # print("Found LOGGING env var.") - if self.log_mode: - e["value"] = "journald" - else: - e["value"] = "syslog" - - uid = uuid.uuid4() - # print(uid) - req = { - "Name": stack, - "Env": envs, - "AdditionalFiles": [], - "AutoUpdate": { - "forcePullImage": True, - "forceUpdate": False, - "webhook": f"{uid}", - }, - "repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git", - "ReferenceName": "refs/heads/main", - "composeFile": f"{stack}/docker-compose.yml", - "ConfigFilePath": f"{stack}/docker-compose.yml", - "repositoryAuthentication": True, - "repositoryUsername": "jaydee", - "repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS", - "AuthorizationType": 0, - "TLSSkipVerify": False, - "supportRelativePath": True, - "repositoryAuthentication": True, - "fromAppTemplate": False, - "registries": [6, 3], - "FromAppTemplate": False, - "Namespace": "", - "CreatedByUserId": "", - "Webhook": "", - "filesystemPath": "/share/docker_data/portainer/portainer-data/", - "RegistryID": 4, - "isDetachedFromGit": True, - "method": "repository", - "swarmID": None, - } - if stack_mode == "swarm": - req["type"] = "swarm" - req["swarmID"] = swarm_id - req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml" - req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml" - if self._debug: - print(json.dumps(req)) - res = self._api_post(path, req) - if "Id" in res: - # print("Deploy request OK") - pass - else: - print(res) - tries = 0 - created = False - while True: - try: - # print(self.endpoint_id) - # print(stack) - if self.get_stack(stack, self.endpoint_id) != 1: - created = True - break - except Exception as e: - print( - f"Waiting for stack {stack} to be created...{tries}/50", - end="\r", - ) - time.sleep(10) - tries += 1 - if tries > 50: - print( - f"Error retrieving stack {stack} after creation: {self.endpoint_name}" - ) - break - logger.debug(f"Exception while getting stack {stack}: {e}") - - if created: - if stack != "pihole": - # print(autostart) - if not autostart: - # self.get_stacks() - # self.stop_stack(stack,self.endpoint_id) - conts = self.get_containers() - # print(conts) - self.stop_containers(self.endpoint_name, conts) - - if mode == "file": - print("Creating new stack from file...") - path = "/stacks/create/standalone/file" - if self.endpoint_id is not None: - path += f"?endpointId={self.endpoint_id}" - - if stack == "all": - if self.endpoint_name == "rack": - stacks = self.rack_stacks - elif self.endpoint_name == "m-server": - stacks = self.m_server_stacks - elif self.endpoint_name == "rpi5": - stacks = self.rpi5_stacks - else: - stacks = [stack] - for stack in stacks: - print(f"Working on {stack}") - if os.path.exists(f"{self.repo_dir}/{stack}/.env"): - f = open(f"{self.repo_dir}/{stack}/.env", "r") - - env_vars = f.read().splitlines() - envs = [] - for ev in env_vars: - if ev.startswith("#") or ev.strip() == "": - continue - if "=" in ev: - name, value = ev.split("=", 1) - envs.append({"name": name, "value": value}) - f.close() - # wl(envs) - for e in envs: - # print(f"Env: {e['name']} = {e['value']}") - HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] - if e["name"] == "RESTART" and self.endpoint_name == "m-server": - e["value"] = "always" - if e["name"] in HWS: - print("Found HW_MODE env var.") - if self.hw_mode: - e["value"] = "hw" - else: - e["value"] = "cpu" - if e["name"] == "LOGGING": - print("Found LOGGING env var.") - if self.log_mode: - e["value"] = "journald" - else: - e["value"] = "syslog" - - file = { - # ("filename", file_object) - "file": ( - "docker-compose.yml", - open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"), - ), - } - self._api_post_file(path, self.endpoint_id, stack, envs, file) - - def print_stacks(self, args): - """Print a table of stacks, optionally filtered by endpoint.""" - stacks = self.get_stacks() - count = 0 - data = [] - stack_names = [] - for stack in stacks: - # print(stack) - if args.endpoint_id is not None: - if not stack["EndpointId"] in self.endpoints["by_id"]: - continue - if args.endpoint_id != "all": - if self.endpoints["by_name"][args.endpoint_id] != stack["EndpointId"]: - continue - try: - stack_names.append(stack["Name"]) - data.append( - [ - stack["Id"], - stack["Name"], - self.endpoints["by_id"][stack["EndpointId"]], - ] - ) - except KeyError as e: - data.append([stack["Id"], stack["Name"], "?"]) - logger.debug( - "KeyError getting endpoint name for stack %s : %s", stack["Name"], e - ) - count += 1 - - data = sorted(data, key=lambda x: x[1]) - headers = ["StackID", "Name", "Endpoint"] - print(tabulate.tabulate(data, headers=headers, tablefmt="github")) - print(f"Total stacks: {count}") - input("Continue...") - # print(sorted(stack_names)) - - def update_containers(self): - all_containers = self.all_data["containers"][self.args.endpoint_id] - #input(all_containers) - service_tuples = [(s[1], s[0]) for s in all_containers if "." not in s[0] and not s[0].startswith("runner-")] - service_tuples = sorted(service_tuples, key=lambda x: x[1]) - service_dict = dict(service_tuples) - # input(service_tuples) - if self.args.service_id is None: - #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] - service_tuples.insert(0, ("__ALL__", "[Select ALL]")) - service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]")) - service_ids = checkboxlist_dialog( - title="Select one service", - text="Choose a service:", - values=service_tuples - ).run() - elif self.args.service_id == "all": - service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" ] - else: - service_ids = [self.args.service_id] - - if self.args.update is False: - if "__ONLY_CHECK__" in service_ids: - service_ids.remove("__ONLY_CHECK__") - pull = False - print("Checking for updates only...") - else: - pull = True - print("Checking for updates and pulling updates...") - else: - pull = True - print("Checking for updates and pulling updates...") - if "__ALL__" in service_ids: - service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] - - longest = 0 - for a in service_dict.items(): - # print(a[1]) - if len(a[1]) > longest: - longest = len(a[1]) - #print(longest) - ok = "\033[92m✔\033[0m" - err = "\033[91m✖\033[0m" - updates = [] - for service_id in service_ids: - # print(self.all_data["containers"][self.args.endpoint_id]) - - print("\033[?25l", end="") - print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True) - path = f"/docker/{self.get_endpoint_id()}/containers/{service_id}/image_status?refresh=true" - - try: - resp = self._api_get(path, timeout=20) - except ValueError as e: - print(f"Error restarting service: {e}") - return [] - #print(resp) - if resp == 500: - print("?") - elif resp['Status'] == "outdated": - if pull: - #print("Recreate") - self.recreate_container(service_id, pull) - #print(f"Service {service_dict[service_id]:<{longest}} : updated") - updates.append(service_dict[service_id]) - print(ok, end=" ") - for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]: - if name.startswith(service_dict[service_id]): - print(image) - else: - print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True) - #print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m") - updates.append(service_dict[service_id]) - print(err, end=" ") - for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]: - if name.startswith(service_dict[service_id]): - print(image) - else: - print(ok, end=" ") - for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]: - if name.startswith(service_dict[service_id]): - print(image) - if len(updates) > 0: - if pull: - self.gotify_message(f"Services updated: {', '.join(updates)}") - else: - self.gotify_message(f"Services updates available: {', '.join(updates)}") - print("\033[?25h", end="") - return True - - def update_service(self): - all_services = self.get_services(self.get_endpoint_id()) - #input(all_services) - service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services] - service_tuples = sorted(service_tuples, key=lambda x: x[1]) - service_dict = dict(service_tuples) - # input(service_tuples) - if self.args.service_id is None: - #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] - service_tuples.insert(0, ("__ALL__", "[Select ALL]")) - service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]")) - service_ids = checkboxlist_dialog( - title="Select one service", - text="Choose a service:", - values=service_tuples - ).run() - if "__ONLY_CHECK__" in service_ids: - self.args.update = False - else: - self.args.update = True - if "__ALL__" in service_ids: - service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] - - elif self.args.service_id == "all": - service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] - else: - service_ids = [self.args.service_id] - - if self.args.update: - pull = True - print("Checking for updates and pulling updates...") - else: - pull = False - print("Checking for updates only...") - - - longest = 0 - for a in service_dict.items(): - if a[0] == "__ONLY_CHECK__": - continue - # print(a[1]) - if len(a[1]) > longest: - longest = len(a[1]) - #print(longest) - ok = "\033[92m✔\033[0m" - err = "\033[91m✖\033[0m" - for service_id in service_ids: - print("\033[?25l", end="") - print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True) - path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true" - - try: - resp = self._api_get(path, timeout=20) - except ValueError as e: - print(f"Error restarting service: {e}") - return [] - - if resp['Status'] == "outdated": - if pull: - self.restart_srv(service_id, pull) - #print(f"Service {service_dict[service_id]:<{longest}} : updated") - self.gotify_message(f"Service {service_dict[service_id]} updated") - print(f"{ok} updated") - else: - print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True) - #print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m") - self.gotify_message(f"Service update available for {service_dict[service_id]}") - print(err) - else: - print(ok) - print("\033[?25h", end="") - return True - - def update_service2(self): - all_services = self.get_services(self.get_endpoint_id(self.args.endpoint_id)) - - service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services] - service_tuples = sorted(service_tuples, key=lambda x: x[1]) - service_dict = dict(service_tuples) - # input(service_tuples) - if self.args.service_id is None: - #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] - service_tuples.insert(0, ("__ALL__", "[Select ALL]")) - service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]")) - service_ids = checkboxlist_dialog( - title="Select one service", - text="Choose a service:", - values=service_tuples - ).run() - elif self.args.service_id == "all": - service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] - else: - service_ids = [self.args.service_id] - if "__ONLY_CHECK__" in service_ids or self.args.update is False: - pull = False - print("Checking for updates only...") - else: - print("Checking for updates and pulling updates...") - pull = True - if "__ALL__" in service_ids: - service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] - - longest = 0 - for a in service_dict.items(): - # print(a[1]) - if len(a[1]) > longest: - longest = len(a[1]) - #print(longest) - ok = "\033[92m✔\033[0m" - err = "\033[91m✖\033[0m" - for service_id in service_ids: - print("\033[?25l", end="") - print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True) - path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true" - - try: - resp = self._api_get(path, timeout=20) - except ValueError as e: - print(f"Error restarting service: {e}") - return [] - - if resp['Status'] == "outdated": - if pull: - self.restart_srv(service_id, pull) - #print(f"Service {service_dict[service_id]:<{longest}} : updated") - self.gotify_message(f"Service {service_dict[service_id]} updated") - print(ok) - else: - print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True) - #print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m") - self.gotify_message(f"Service update available for {service_dict[service_id]}") - print(err) - else: - print(ok) - print("\033[?25h", end="") - return True - - def recreate_container(self,service_id, pull=False): - """Restart a service on an endpoint.""" - path = f"/docker/{self.endpoint_id}/containers/{service_id}/recreate" - # print(path) - params={"pullImage": pull} - try: - resp = self._api_post(path, json=params, timeout=20) - #print(resp) - except ValueError as e: - print(f"Error restarting service: {e}") - return [] - - def restart_srv(self,service_id, pool=False): - """Restart a service on an endpoint.""" - path = f"/endpoints/{self.endpoint_id}/forceupdateservice" - params={"serviceID": service_id, "pullImage": pool} - try: - resp = self._api_put(path, json=params, timeout=20) - # print(resp) - except ValueError as e: - print(f"Error restarting service: {e}") - return [] - - def restart_service(self, endpoint_id, service_id): - stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] - stacks = sorted(stacks, key=lambda x: x[1]) - stack_id = radiolist_dialog( - title="Select one service", - text="Choose a service:", - values=stacks - ).run() - service_dict = dict(stacks) - services = self.get_services(self.endpoint_name, stack_id) - svc_name = service_dict.get(stack_id) - stack_svcs = [] - svc_menu = [] - for s in services: - try: - if svc_name in s['Spec']['Name']: - stack_svcs.append([s['Version']['Index'], s['Spec']['Name']]) - svc_menu.append([s['ID'], s['Spec']['Name']]) - except KeyError as e: - print(e) - - - service_id = radiolist_dialog( - title="Select one service", - text="Choose a service:", - values=svc_menu - ).run() - - self.restart_srv(service_id, False) - - print(f"Service {service_id} : restarted") - return True - - def start_stack(self, stack=None, endpoint_id=None): - """Start one stack or all stacks on an endpoint.""" - if endpoint_id is not None: - print("Getting endpoint") - self.get_endpoint(endpoint_id) - if stack is not None: - for s in stack: - self.stack_ids = [self._resolve_stack_id(s, endpoint_id)] - for stck in self.stack_ids: - path = f"/stacks/{stck}/start" - if self.endpoint_id is not None: - path += f"?endpointId={self.endpoint_id}" - try: - resp = self._api_post_no_body(path, timeout=20) - except ValueError as e: - print(f"Error stoping stack: {e}") - return [] - if "Id" in json.loads(resp): - print( - f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started" - ) - else: - print( - f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" - ) - return True - - def stop_stack(self, stack, endpoint_id): - - """Stop one stack or all stacks on an endpoint.""" - print(f"Stopping stack {stack}") - - if endpoint_id is not None: - self.get_endpoint(endpoint_id) - - if stack is not None: - for s in stack: - self.stack_ids = [self._resolve_stack_id(s, endpoint_id)] - # print(self.stack_ids) - for stck in self.stack_ids: - path = f"/stacks/{stck}/stop" - # print(path) - if self.endpoint_id is not None: - path += f"?endpointId={self.endpoint_id}" - try: - resp = self._api_post_no_body(path, timeout=120) - except NameError as e: - print(f"Error stopping stack: {e}") - return [] - if "Id" in json.loads(resp): - print( - f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped" - ) - else: - print( - f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" - ) - return True - - def _resolve_endpoint(self, endpoint_id): - - self.get_endpoints() - if self._debug: - print(endpoint_id) - print(self.endpoints) - if self._is_number(endpoint_id): - self.endpoint_id = int(endpoint_id) - self.endpoint_name = self.endpoints["by_id"][self.endpoint_id] - else: - self.endpoint_name = endpoint_id - self.endpoint_id = int(self.endpoints["by_name"][endpoint_id]) - - def _resolve_stack_id(self, stack, endpoint_id): - if stack == "all": - return "all" - - if not self._is_number(stack): - result = self.get_stack(stack, endpoint_id) - return result["Id"] - - return int(stack) - - def _delete_all_stacks(self, endpoint_id): - stacks = self.get_stacks(endpoint_id) - paths = [] - - for s in stacks: - if int(s["EndpointId"]) != int(endpoint_id): - continue - - path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true" - paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) - - def delete_item(item): - print(f"Delete stack {item[1]} from {item[0]}") - out = self._api_delete(item[2]) - logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out) - - with ThreadPoolExecutor(max_workers=10) as exe: - exe.map(delete_item, paths) - - return "Done" - - def _delete_single_stack(self, stack_id, endpoint_id): - path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true" - # print(path) - try: - out = self._api_delete(path,timeout=240) - except ValueError as e: - msg = str(e) - if "Conflict for url" in msg: - print("Stack with this name may already exist.") - else: - print(f"Error deleting stack: {e}") - return [] - - return out or [] - - def delete_stack(self, endpoint_id=None, stack=None): - """Delete one stack or all stacks on an endpoint.""" - self._resolve_endpoint(endpoint_id) - endpoint_id = self.endpoint_id - - if stack == "all": - return self._delete_all_stacks(endpoint_id) - else: - for s in stack: - print(f" >> Deleting stack {s} from endpoint {self.endpoint_name}") - stack_id = self._resolve_stack_id(s, endpoint_id) - self._delete_single_stack(stack_id, endpoint_id) - return "Done" - - # def delete_stack(self, endpoint_id=None, stack=None): - # """ - # Return a list of stacks. If endpoint_id is provided, it will be added as a query param. - # """ - # self.get_endpoints() - # if self._is_number(endpoint_id): - # self.endpoint_name = self.endpoints["by_id"][endpoint_id] - # self.endpoint_id = endpoint_id - # else: - # self.endpoint_name = endpoint_id - # self.endpoint_id = self.endpoints["by_name"][endpoint_id] - - # if not self._is_number(endpoint_id): - # endpoint_id = int(self.endpoints["by_name"][endpoint_id]) - - # if not self._is_number(stack) and stack != "all": - # # print(stack) - # # print(self.endpoint_id) - # stack = self.get_stack(stack, self.endpoint_id)["Id"] - # if stack == "all": - # stacks = self.get_stacks(self.endpoint_id) - # paths = [] - # for s in stacks: - # # print(f"Delete stack {s['Name']}") - # # print(s['EndpointId'], endpoint_id) - # if int(s["EndpointId"]) != int(endpoint_id): - # continue - # # print("Deleting stack:", s['Name']) - # path = f"/stacks/{s['Id']}" - # if endpoint_id is not None: - # path += f"?endpointId={endpoint_id}&removeVolumes=true" - # paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) - # # input(paths) - - # def delete(c): - # print(f"Delete stack {c[1]} from {c[0]} ") - # out = self._api_delete(c[2]) - # logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}") - - # with ThreadPoolExecutor(max_workers=10) as exe: - # exe.map(delete, paths) - # return "Done" - # else: - # path = f"/stacks/{stack}" - - # if endpoint_id is not None: - # path += f"?endpointId={endpoint_id}&removeVolumes=true" - # # print(path) - # try: - # # print(path) - # # print(base_url) - # # print(token) - # stacks = self._api_delete(path) - # except Exception as e: - # # print(f"Error creating stack: {e}") - # if "Conflict for url" in str(e): - # print("Stack with this name may already exist.") - # else: - # print(f"Error deleting stack: {e}") - # # print(stacks) - # return [] - # if stacks is None: - # return [] - - # return stacks - - def create_secret(self, name, value, endpoint_id=None, timeout=None): - """Create a Docker secret on the specified endpoint.""" - endpoint_id = int(self.endpoints["by_name"][endpoint_id]) - path = f"/endpoints/{endpoint_id}/docker/secrets/create" - encoded = base64.b64encode(value.encode()).decode() - data = {"Name": name, "Data": encoded} - - return self._api_post(path, data, timeout=timeout) - - - -VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://192.168.77.101:8200") -try: - VAULT_TOKEN = os.environ.get("VAULT_TOKEN") - if VAULT_TOKEN is None: - raise KeyError -except KeyError: - VAULT_TOKEN = prompt("Valult root token : ", is_password=True) -os.environ["VAULT_TOKEN"] = VAULT_TOKEN - -client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN) -# Check if connected -if client.is_authenticated(): - print("Connected to Vault") -else: - raise Exception("Failed to authenticate with Vault") -# Specify the mount point of your KV engine - -VERSION = "0.1.50" - -defaults = { - "endpoint_id": "vm01", - "stack": "my_stack", - "deploy_mode": "git", - "autostart": "True", - "stack_mode": "swarm", - "site": "portainer", -} - -cur_config = {} - -def load_config(defaults=defaults): - '''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.''' - if os.path.exists("/myapps/portainer.conf"): - with open("/myapps/portainer.conf", "r") as f: - conf_data = f.read() - for line in conf_data.split("\n"): - if line.startswith("#") or line.strip() == "": - continue - key, value = line.split("=", 1) - os.environ[key.strip()] = value.strip() - cur_config[key.strip()] = value.strip() - else: - print("No /myapps/portainer.conf file found, proceeding with env vars.") - os.makedirs("/myapps", exist_ok=True) - - for field in defaults.keys(): - value_in = os.getenv(f"PORTAINER_{field.upper()}") - if value_in is not None: - os.environ[f"PORTAINER_{field.upper()}"] = value_in - cur_config[f"PORTAINER_{field.upper()}"] = value_in - else: - os.environ[f"PORTAINER_{field.upper()}"] = defaults[field] - cur_config[f"PORTAINER_{field.upper()}"] = defaults[field] - - conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) - # print("Using the following configuration:") - with open("/myapps/portainer.conf", "w") as f: - f.write(conf_data) - print("Configuration written to /myapps/portainer.conf") - return cur_config - -a = load_config(defaults) - -# ENV_VARS = [ -# "PORTAINER_URL", -# "PORTAINER_SITE", -# "PORTAINER_ENDPOINT_ID", -# "PORTAINER_STACK", -# "PORTAINER_DEPLOY_MODE", -# "PORTAINER_STACK_MODE", -# ] - - -def update_configs(cur_config): - '''Update defaults from environment variables if set.''' - conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) - # print("Using the following configuration:") - # print(conf_data) - with open("/myapps/portainer.conf", "w") as f: - f.write(conf_data) - print("Configuration written to /myapps/portainer.conf") - -parser = argparse.ArgumentParser( - description=f"""\ -Portainer helper - use env vars or pass credentials." -version: {VERSION} - """, - formatter_class=argparse.RawTextHelpFormatter, -) -parser.add_argument( - "--base", - "-b", - default=os.getenv("PORTAINER_URL", "https://portainer.example.com"), - help="Base URL for Portainer (ENV: PORTAINER_URL)", -) -parser.add_argument("--site", "-t", type=str, default=None, help="Site") -parser.add_argument( - "--endpoint-id", - "-e", - type=str, - default=None, - help="Endpoint ID to limit stack operations", -) -parser.add_argument( - "--service-id", - "-i", - type=str, - default=None, - help="Service ID to limit service operations", -) -parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations") -parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform") -parser.add_argument( - "--autostart", "-Z", action="store_true", help="Auto-start created stacks" -) -parser.add_argument("--update", "-u", action="store_true", help="Update service if it exists") -parser.add_argument("--debug", "-D", action="store_true") -parser.add_argument("--launcher", "-L", action="store_true") -parser.add_argument("--gpu", "-g", action="store_true") -parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds") -parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode") -parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode") - -args = parser.parse_args() -print("Running version:", VERSION) -print("Environment:", args.site) -args.client = client -if args.site is not None: - cur_config["PORTAINER_SITE"] = args.site -if args.endpoint_id is not None: - cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id -if args.stack is not None: - cur_config["PORTAINER_STACK"] = args.stack -if args.deploy_mode is not None: - cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode -if args.stack_mode is not None: - cur_config["PORTAINER_STACK_MODE"] = args.stack_mode - -update_configs(cur_config) - -if args.debug: - input(cur_config) - -_LOG_LEVEL = "DEBUG" -LOG_FILE = "/tmp/portainer.log" -if _LOG_LEVEL == "DEBUG": - logging.basicConfig( - filename=LOG_FILE, - level=logging.DEBUG, - format="%(asctime)s : %(levelname)s : %(message)s", - datefmt="%m/%d/%Y %I:%M:%S %p", - ) - logging.debug("using debug logging") -elif _LOG_LEVEL == "ERROR": - logging.basicConfig( - filename=LOG_FILE, - level=logging.ERROR, - format="%(asctime)s : %(levelname)s : %(message)s", - datefmt="%m/%d/%Y %I:%M:%S %p", - ) - logging.info("using error logging") -elif _LOG_LEVEL == "SCAN": - logging.basicConfig( - filename=LOG_FILE, - level=logging.DEBUG, - format="%(asctime)s : %(levelname)s : %(message)s", - datefmt="%m/%d/%Y %I:%M:%S %p", - ) - logging.info("using scan logging") -else: - logging.basicConfig( - filename=LOG_FILE, - level=logging.INFO, - format="%(asctime)s : %(levelname)s : %(message)s", - datefmt="%m/%d/%Y %I:%M:%S %p", - ) -logging.info("script started") - -logger = logging.getLogger(__name__) - - -def wl(msg): - """Write log message if debug is enabled.""" - if args.debug: - print(msg) - - -def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None): - """ - fields = [("arg_name", "Prompt text")] - """ - - longest = 0 - for field, text in fields: - a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")" - if len(a) > longest: - longest = len(a) - - for field, text in fields: - # print(field) - value_in = getattr(args_in, field) - default = defaults_in.get(f"PORTAINER_{field}".upper()) - cur_site = defaults_in.get("PORTAINER_SITE".upper()) - cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper()) - - # print(value_in) - if value_in is None: - if default is not None: - prompt_text = f"{text} (default={default}) : " - # value_in = input(prompt) or default - if field == "site": - commands = ["portainer", "port"] - elif field == "deploy_mode": - commands = ["git", "upload"] - elif field == "stack_mode": - commands = ["swarm", "compose"] - elif field == "endpoint_id": - commands = por.endpoints_names - elif field == "stack": - if args.action == "create_stack": - # input(json.dumps(stacks, indent=2)) - commands = [ - 'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana', - 'hashicorp', 'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3', - 'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx', - 'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry', - 'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress', - 'wud', 'zabbix-server'] - try: - print(por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys()) - for s in por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys(): - - #print(s) - commands.remove(s) - except KeyError: - print("No stacks found for endpoint", defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]) - - else: - commands = [] - if por._debug: - input(por.stacks_all) - # print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]) - try: - for s in por.stacks_all[ - defaults_in[f"PORTAINER_ENDPOINT_ID".upper()] - ]["by_name"].keys(): - commands.append(s) - except KeyError: - print( - "No stacks found for endpoint", - defaults_in[f"PORTAINER_ENDPOINT_ID".upper()], - ) - sys.exit(1) - else: - commands = [] - completer = WordCompleter( - commands, ignore_case=True, match_middle=False - ) - try: - if field == "stack": - commands.sort() - commands_tuples = [(cmd, cmd) for cmd in commands] - commands_tuples.insert(0, ("__ALL__", "[Select ALL]")) - value_in = checkboxlist_dialog( - title="Select Services", - text="Choose one or more services:", - values=commands_tuples, - ).run() - - if value_in is None: - print("Cancelled.") - sys.exit(0) - elif "__ALL__" in value_in: - # User selected "Select ALL" - value_in = commands # all real commands - - - value_in.sort() - - if "pihole" in value_in: - if action == "delete_stack": - value_in.remove("pihole") - value_in.append("pihole") - else: - value_in.remove("pihole") - value_in.insert(0, "pihole") - print(" >> Stacks :", ",".join(value_in)) - else: - value_in = ( - prompt( - f" >> {prompt_text}", - completer=completer, - placeholder=default, - ) - or default - ) - except KeyboardInterrupt: - print("\n^C received — exiting cleanly.") - sys.exit(0) - - # value_in = input_with_default(prompt_text, default, longest+2) - - else: - # value_in = input(f"{text}: ") - commands = ["start", "stop", "status", "restart", "reload", "exit"] - completer = WordCompleter(commands, ignore_case=True) - try: - value_in = ( - prompt( - f" >> {text} {default}", - completer=completer, - placeholder=default, - ) - or default - ) - except KeyboardInterrupt: - print("\n^C received — exiting cleanly.") - sys.exit(0) - - # value_in = input_with_default(text, default, longest+2) - value_in.sort() - if por._debug: - print("Value entered:", value_in) - defaults_in[f"PORTAINER_{field}".upper()] = value_in - setattr(args, field, value_in) - - if field == "site" and value_in != cur_site: - por.get_site(value_in) - if value_in == "portainer": - defaults_in["PORTAINER_ENDPOINT_ID"] = "m-s" - elif value_in == "port": - defaults_in["PORTAINER_ENDPOINT_ID"] = "vm01" - if field == "stack" and value_in != cur_site: - os.environ[field] = ",".join(value_in) - else: - os.environ[field] = value_in - if por._debug: - print(f"{defaults_in} {field} {value_in}") - if field == "endpoint_id" and value_in != defaults_in.get( - "PORTAINER_ENDPOINT_ID".upper() - ): - print("refreshing environment") - por.get_endpoints() - with open("/myapps/portainer.conf", "w") as f: - for k in defaults_in.keys(): - f.write(f"{k}={defaults_in[k]}\n") - - return args - - -if __name__ == "__main__": - # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. - - def signal_handler(sig, frame): - logger.warning("Killed manually %s, %s", sig, frame) - print("\nTerminated by user") - print("\033[?25h", end="") - sys.exit(0) - signal.signal(signal.SIGINT, signal_handler) - os.system("cls" if os.name == "nt" else "clear") - if args.action is None: - actions = [ - ("create_stack","create_stack"), - ("delete_stack","delete_stack"), - ("stop_stack","stop_stack"), - ("start_stack","start_stack"), - ("restart_service","restart_service"), - ("update_service","update_service"), - ("update_containers","update_containers"), - ("list_stacks","list_stacks"), - ("update_stack","update_stack"), - ("secrets","secrets"), - ("print_all_data","print_all_data"), - ("list_endpoints","list_endpoints"), - ("list_containers","list_containers"), - ("stop_containers","stop_containers"), - ("start_containers","start_containers"), - ("refresh_environment","refresh_environment"), - ("refresh_status","refresh_status"), - ("update_status","update_status"), - ] - - selected_action = radiolist_dialog( - title="Select one service", - text="Choose a service:", - values=actions - ).run() - - - - print("Selected:", selected_action) - - - - - # print("Possible actions: \n") - # i = 1 - # for a in actions: - # print(f" > {i:>2}. {a}") - # i += 1 - # ans = input("\nSelect action to perform: ") - args.action = selected_action - - - os.system("cls" if os.name == "nt" else "clear") - # Example: list endpoints - por = PortainerApi(cur_config["PORTAINER_SITE"], args) - por.set_defaults(cur_config) - if args.debug: - por._debug = True - - if args.action == "secrets": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ], - ) - secrets = { - "gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb", - "influxdb2-admin-token": "l4c1j4yd33Du5lo", - "ha_influxdb2_admin_token": "l4c1j4yd33Du5lo", - "wordpress_db_password": "wordpress", - "wordpress_root_db_password": "wordpress", - } - for key, value in secrets.items(): - res = por.create_secret(key, value, args.endpoint_id, args.timeout) - print(res) - sys.exit() - - if args.action == "delete_stack": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ("stack", "Stack name or ID"), - ], - action="delete_stack", - ) - - input( - f"\nDelete stack {','.join(args.stack)} on endpoint {args.endpoint_id}. Press ENTER to continue..." - ) - por.delete_stack( - args.endpoint_id, - args.stack, - ) - sys.exit() - - if args.action == "create_stack": - por.action = "create_stack" - #print(cur_config) - #print(args) - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ("stack", "Stack name or ID"), - ("stack_mode", "Stack mode (swarm or compose)"), - ("deploy_mode", "Deploy mode (git or upload)"), - ], - por, - ) - por.create_stack( - args.endpoint_id, - args.stack, - args.deploy_mode, - args.autostart, - args.stack_mode, - ) - sys.exit() - - if args.action == "stop_stack": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ("stack", "Stack name or ID"), - ], - ) - - por.stop_stack(args.stack, args.endpoint_id) - sys.exit() - - if args.action == "start_stack": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ("stack", "Stack name or ID"), - ], - ) - por.start_stack(args.stack, args.endpoint_id) - sys.exit() - - if args.action == "restart_service": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID") - ], - ) - por.restart_service(args.endpoint_id, "lala") - sys.exit() - - if args.action == "update_service": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID") - ], - ) - por.update_service() - if args.launcher: - input("\nPress ENTER to continue...") - sys.exit() - - if args.action == "update_containers": - - - - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID") - ], - ) - por.update_containers() - sys.exit() - - if args.action == "list_stacks": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ], - ) - por.print_stacks(args) - if args.launcher: - input("Press ENTER to continue...") - # print(json.dumps(por.all_data, indent=2)) - sys.exit() - - if args.action == "list_containers": - print("Getting containers") - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ], - ) - print("\n".join(por.get_containers())) - if args.launcher: - input("\nPress ENTER to continue...") - sys.exit() - - if args.action == "update_stack": - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID") - ], - ) - - por.update_stack(args) - if args.launcher: - input("\nPress ENTER to continue...") - sys.exit() - - if args.action == "print_all_data": - print(json.dumps(por.all_data, indent=2)) - if args.launcher: - input("\nPress ENTER to continue...") - sys.exit() - - if args.action == "update_status": - por.update_status(args.endpoint_id, args.stack) - sys.exit() - - if args.action == "list_endpoints": - eps = por.get_endpoints(args) - export_data = [] - for i in eps["by_id"]: - export_data.append([i, eps["by_id"][i]]) - headers = ["EndpointId", "Name"] - print(tabulate(export_data, headers=headers, tablefmt="github")) - if args.launcher: - input("\nPress ENTER to continue...") - sys.exit() - - if args.action == "stop_containers": - # TODO: does not work - args = prompt_missing_args( - args, - cur_config, - [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ], - ) - if por.all_data["endpoints_status"][args.endpoint_id] != 1: - print(f"Endpoint {por.get_endpoint_name(args.endpoint_id)} is offline") - sys.exit() - print(f"Stopping containers on {por.get_endpoint_name(args.endpoint_id)}") - cont = [] - for c in por.all_data["containers"][args.endpoint_id]: - if args.stack in (c, "all"): - cont += por.all_data["containers"][args.endpoint_id][c] - por.stop_containers(args.endpoint_id, cont) - sys.exit() - - if args.action == "start_containers": - print("Starting containers") - cont = [] - # input(json.dumps(por.all_data, indent=2)) - for c in por.all_data["containers"][args.endpoint_id]: - if args.stack in (c, "all"): - cont += por.all_data["containers"][args.endpoint_id][c] - por.start_containers(args.endpoint_id, cont) - sys.exit() - if args.action == "start_containers": - print("Starting containers") - cont = [] - # input(json.dumps(por.all_data,indent=2)) - for c in por.all_data["containers"][args.endpoint_id]: - if args.stack in (c, "all"): - cont += por.all_data["containers"][args.endpoint_id][c] - por.start_containers(args.endpoint_id, cont) - sys.exit() - if args.action == "refresh_environment": - cont = por.refresh() - sys.exit() - - if args.action == "refresh_status": - if args.stack == "all": - print("Stopping all stacks...") - stcks = por.get_stacks(endpoint_id=args.endpoint_id) - else: - por.refresh_status(args.stack_id) diff --git a/__init__.py b/portainer/__init__.py similarity index 100% rename from __init__.py rename to portainer/__init__.py diff --git a/port.py b/portainer/api.py similarity index 100% rename from port.py rename to portainer/api.py