""" Portainer API Client module. This module provides a wrapper for interacting with the Portainer API to manage endpoints, stacks, and containers. """ import os import logging import signal import sys import json import argparse import hvac import time import base64 import shutil import requests from portainer.api import PortainerApi from git import Repo from concurrent.futures import ThreadPoolExecutor from tabulate import tabulate from prompt_toolkit import prompt from prompt_toolkit.completion import WordCompleter from prompt_toolkit.shortcuts import checkboxlist_dialog from prompt_toolkit.shortcuts import radiolist_dialog VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://192.168.77.101:8200") try: VAULT_TOKEN = os.environ.get("VAULT_TOKEN") if VAULT_TOKEN is None: raise KeyError except KeyError: VAULT_TOKEN = prompt("Valult root token : ", is_password=True) os.environ["VAULT_TOKEN"] = VAULT_TOKEN client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN) # Check if connected if client.is_authenticated(): print("Connected to Vault") else: raise Exception("Failed to authenticate with Vault") # Specify the mount point of your KV engine VERSION = "0.1.53" defaults = { "endpoint_id": "vm01", "stack": "my_stack", "deploy_mode": "git", "autostart": "True", "stack_mode": "swarm", "site": "portainer", } cur_config = {} def load_config(defaults=defaults): '''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.''' if os.path.exists("/myapps/portainer.conf"): with open("/myapps/portainer.conf", "r") as f: conf_data = f.read() for line in conf_data.split("\n"): if line.startswith("#") or line.strip() == "": continue key, value = line.split("=", 1) os.environ[key.strip()] = value.strip() cur_config[key.strip()] = value.strip() else: print("No /myapps/portainer.conf file found, proceeding with env vars.") os.makedirs("/myapps", exist_ok=True) for field in defaults.keys(): value_in = os.getenv(f"PORTAINER_{field.upper()}") if value_in is not None: os.environ[f"PORTAINER_{field.upper()}"] = value_in cur_config[f"PORTAINER_{field.upper()}"] = value_in else: os.environ[f"PORTAINER_{field.upper()}"] = defaults[field] cur_config[f"PORTAINER_{field.upper()}"] = defaults[field] conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) # print("Using the following configuration:") with open("/myapps/portainer.conf", "w") as f: f.write(conf_data) print("Configuration written to /myapps/portainer.conf") return cur_config a = load_config(defaults) # ENV_VARS = [ # "PORTAINER_URL", # "PORTAINER_SITE", # "PORTAINER_ENDPOINT_ID", # "PORTAINER_STACK", # "PORTAINER_DEPLOY_MODE", # "PORTAINER_STACK_MODE", # ] def update_configs(cur_config): '''Update defaults from environment variables if set.''' conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) # print("Using the following configuration:") # print(conf_data) with open("/myapps/portainer.conf", "w") as f: f.write(conf_data) print("Configuration written to /myapps/portainer.conf") parser = argparse.ArgumentParser( description=f"""\ Portainer helper - use env vars or pass credentials." version: {VERSION} """, formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( "--base", "-b", default=os.getenv("PORTAINER_URL", "https://portainer.example.com"), help="Base URL for Portainer (ENV: PORTAINER_URL)", ) parser.add_argument("--site", "-t", type=str, default=None, help="Site") parser.add_argument( "--endpoint-id", "-e", type=str, default=None, help="Endpoint ID to limit stack operations", ) parser.add_argument( "--service-id", "-i", type=str, default=None, help="Service ID to limit service operations", ) parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations") parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform") parser.add_argument( "--autostart", "-Z", action="store_true", help="Auto-start created stacks" ) parser.add_argument("--update", "-u", action="store_true", help="Update service if it exists") parser.add_argument("--debug", "-D", action="store_true") parser.add_argument("--launcher", "-L", action="store_true") parser.add_argument("--gpu", "-g", action="store_true") parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds") parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode") parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode") args = parser.parse_args() print("Running version:", VERSION) print("Environment:", args.site) args.client = client if args.site is not None: cur_config["PORTAINER_SITE"] = args.site if args.endpoint_id is not None: cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id if args.stack is not None: cur_config["PORTAINER_STACK"] = args.stack if args.deploy_mode is not None: cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode if args.stack_mode is not None: cur_config["PORTAINER_STACK_MODE"] = args.stack_mode update_configs(cur_config) if args.debug: input(cur_config) _LOG_LEVEL = "DEBUG" LOG_FILE = "/tmp/portainer.log" if _LOG_LEVEL == "DEBUG": logging.basicConfig( filename=LOG_FILE, level=logging.DEBUG, format="%(asctime)s : %(levelname)s : %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", ) logging.debug("using debug logging") elif _LOG_LEVEL == "ERROR": logging.basicConfig( filename=LOG_FILE, level=logging.ERROR, format="%(asctime)s : %(levelname)s : %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", ) logging.info("using error logging") elif _LOG_LEVEL == "SCAN": logging.basicConfig( filename=LOG_FILE, level=logging.DEBUG, format="%(asctime)s : %(levelname)s : %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", ) logging.info("using scan logging") else: logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format="%(asctime)s : %(levelname)s : %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", ) logging.info("script started") logger = logging.getLogger(__name__) def wl(msg): """Write log message if debug is enabled.""" if args.debug: print(msg) def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None): """ fields = [("arg_name", "Prompt text")] """ longest = 0 for field, text in fields: a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")" if len(a) > longest: longest = len(a) for field, text in fields: # print(field) value_in = getattr(args_in, field) default = defaults_in.get(f"PORTAINER_{field}".upper()) cur_site = defaults_in.get("PORTAINER_SITE".upper()) cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper()) # print(value_in) if value_in is None: if default is not None: prompt_text = f"{text} (default={default}) : " # value_in = input(prompt) or default if field == "site": commands = ["portainer", "port"] elif field == "deploy_mode": commands = ["git", "upload"] elif field == "stack_mode": commands = ["swarm", "compose"] elif field == "endpoint_id": commands = por.endpoints_names elif field == "stack": if args.action == "create_stack": # input(json.dumps(stacks, indent=2)) commands = [ 'authentik', 'bitwarden', 'bookstack', 'dockermon', 'duplicati', 'fail2ban', 'gitea', 'gitlab', 'grafana', 'grocy', 'hashicorp', 'home-assistant', 'homebox','homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'kopia', 'mailu3', 'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx', 'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry', 'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress', 'wud', 'zabbix-server'] try: print(por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys()) for s in por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys(): #print(s) commands.remove(s) except KeyError: print("No stacks found for endpoint", defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]) else: commands = [] if por._debug: input(por.stacks_all) # print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]) try: for s in por.stacks_all[ defaults_in[f"PORTAINER_ENDPOINT_ID".upper()] ]["by_name"].keys(): commands.append(s) except KeyError: print( "No stacks found for endpoint", defaults_in[f"PORTAINER_ENDPOINT_ID".upper()], ) sys.exit(1) else: commands = [] completer = WordCompleter( commands, ignore_case=True, match_middle=False ) try: if field == "stack": commands.sort() commands_tuples = [(cmd, cmd) for cmd in commands] commands_tuples.insert(0, ("__ALL__", "[Select ALL]")) value_in = checkboxlist_dialog( title="Select Services", text="Choose one or more services:", values=commands_tuples, ).run() if value_in is None: print("Cancelled.") sys.exit(0) elif "__ALL__" in value_in: # User selected "Select ALL" value_in = commands # all real commands value_in.sort() if "pihole" in value_in: if action == "delete_stack": value_in.remove("pihole") value_in.append("pihole") else: value_in.remove("pihole") value_in.insert(0, "pihole") print(" >> Stacks :", ",".join(value_in)) else: value_in = ( prompt( f" >> {prompt_text}", completer=completer, placeholder=default, ) or default ) except KeyboardInterrupt: print("\n^C received — exiting cleanly.") sys.exit(0) # value_in = input_with_default(prompt_text, default, longest+2) else: # value_in = input(f"{text}: ") commands = ["start", "stop", "status", "restart", "reload", "exit"] completer = WordCompleter(commands, ignore_case=True) try: value_in = ( prompt( f" >> {text} {default}", completer=completer, placeholder=default, ) or default ) except KeyboardInterrupt: print("\n^C received — exiting cleanly.") sys.exit(0) # value_in = input_with_default(text, default, longest+2) value_in.sort() if por._debug: print("Value entered:", value_in) defaults_in[f"PORTAINER_{field}".upper()] = value_in setattr(args, field, value_in) if field == "site" and value_in != cur_site: por.get_site(value_in) if value_in == "portainer": defaults_in["PORTAINER_ENDPOINT_ID"] = "m-s" elif value_in == "port": defaults_in["PORTAINER_ENDPOINT_ID"] = "vm01" if field == "stack" and value_in != cur_site: os.environ[field] = ",".join(value_in) else: os.environ[field] = value_in if por._debug: print(f"{defaults_in} {field} {value_in}") if field == "endpoint_id" and value_in != defaults_in.get( "PORTAINER_ENDPOINT_ID".upper() ): print("refreshing environment") por.get_endpoints() with open("/myapps/portainer.conf", "w") as f: for k in defaults_in.keys(): f.write(f"{k}={defaults_in[k]}\n") return args if __name__ == "__main__": # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. def signal_handler(sig, frame): logger.warning("Killed manually %s, %s", sig, frame) print("\nTerminated by user") print("\033[?25h", end="") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) os.system("cls" if os.name == "nt" else "clear") if args.action is None: actions = [ ("create_stack","create_stack"), ("delete_stack","delete_stack"), ("stop_stack","stop_stack"), ("start_stack","start_stack"), ("restart_service","restart_service"), ("update_service","update_service"), ("update_containers","update_containers"), ("list_stacks","list_stacks"), ("update_stack","update_stack"), ("secrets","secrets"), ("print_all_data","print_all_data"), ("list_endpoints","list_endpoints"), ("list_containers","list_containers"), ("stop_containers","stop_containers"), ("start_containers","start_containers"), ("refresh_environment","refresh_environment"), ("refresh_status","refresh_status"), ("update_status","update_status"), ] selected_action = radiolist_dialog( title="Select one service", text="Choose a service:", values=actions ).run() print("Selected:", selected_action) # print("Possible actions: \n") # i = 1 # for a in actions: # print(f" > {i:>2}. {a}") # i += 1 # ans = input("\nSelect action to perform: ") args.action = selected_action os.system("cls" if os.name == "nt" else "clear") # Example: list endpoints por = PortainerApi(cur_config["PORTAINER_SITE"], args) por.set_defaults(cur_config) if args.debug: por._debug = True if args.action == "secrets": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ], ) secrets = { "gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb", "influxdb2-admin-token": "l4c1j4yd33Du5lo", "ha_influxdb2_admin_token": "l4c1j4yd33Du5lo", "wordpress_db_password": "wordpress", "wordpress_root_db_password": "wordpress", } for key, value in secrets.items(): res = por.create_secret(key, value, args.endpoint_id, args.timeout) print(res) sys.exit() if args.action == "delete_stack": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ("stack", "Stack name or ID"), ], action="delete_stack", ) input( f"\nDelete stack {','.join(args.stack)} on endpoint {args.endpoint_id}. Press ENTER to continue..." ) por.delete_stack( args.endpoint_id, args.stack, ) sys.exit() if args.action == "create_stack": por.action = "create_stack" #print(cur_config) #print(args) args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ("stack", "Stack name or ID"), ("stack_mode", "Stack mode (swarm or compose)"), ("deploy_mode", "Deploy mode (git or upload)"), ], por, ) por.create_stack( args.endpoint_id, args.stack, args.deploy_mode, args.autostart, args.stack_mode, ) sys.exit() if args.action == "stop_stack": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ("stack", "Stack name or ID"), ], ) por.stop_stack(args.stack, args.endpoint_id) sys.exit() if args.action == "start_stack": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ("stack", "Stack name or ID"), ], ) por.start_stack(args.stack, args.endpoint_id) sys.exit() if args.action == "restart_service": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID") ], ) por.restart_service(args.endpoint_id, "lala") sys.exit() if args.action == "update_service": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID") ], ) por.update_service() if args.launcher: input("\nPress ENTER to continue...") sys.exit() if args.action == "update_containers": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID") ], ) por.update_containers() sys.exit() if args.action == "list_stacks": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ], ) por.print_stacks(args) if args.launcher: input("Press ENTER to continue...") # print(json.dumps(por.all_data, indent=2)) sys.exit() if args.action == "list_containers": print("Getting containers") args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ], ) print("\n".join(por.get_containers())) if args.launcher: input("\nPress ENTER to continue...") sys.exit() if args.action == "update_stack": args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID") ], ) por.update_stack(args) if args.launcher: input("\nPress ENTER to continue...") sys.exit() if args.action == "print_all_data": print(json.dumps(por.all_data, indent=2)) if args.launcher: input("\nPress ENTER to continue...") sys.exit() if args.action == "update_status": por.update_status(args.endpoint_id, args.stack) sys.exit() if args.action == "list_endpoints": eps = por.get_endpoints(args) export_data = [] for i in eps["by_id"]: export_data.append([i, eps["by_id"][i]]) headers = ["EndpointId", "Name"] print(tabulate(export_data, headers=headers, tablefmt="github")) if args.launcher: input("\nPress ENTER to continue...") sys.exit() if args.action == "stop_containers": # TODO: does not work args = prompt_missing_args( args, cur_config, [ ("site", "Site"), ("endpoint_id", "Endpoint ID"), ], ) if por.all_data["endpoints_status"][args.endpoint_id] != 1: print(f"Endpoint {por.get_endpoint_name(args.endpoint_id)} is offline") sys.exit() print(f"Stopping containers on {por.get_endpoint_name(args.endpoint_id)}") cont = [] for c in por.all_data["containers"][args.endpoint_id]: if args.stack in (c, "all"): cont += por.all_data["containers"][args.endpoint_id][c] por.stop_containers(args.endpoint_id, cont) sys.exit() if args.action == "start_containers": print("Starting containers") cont = [] # input(json.dumps(por.all_data, indent=2)) for c in por.all_data["containers"][args.endpoint_id]: if args.stack in (c, "all"): cont += por.all_data["containers"][args.endpoint_id][c] por.start_containers(args.endpoint_id, cont) sys.exit() if args.action == "start_containers": print("Starting containers") cont = [] # input(json.dumps(por.all_data,indent=2)) for c in por.all_data["containers"][args.endpoint_id]: if args.stack in (c, "all"): cont += por.all_data["containers"][args.endpoint_id][c] por.start_containers(args.endpoint_id, cont) sys.exit() if args.action == "refresh_environment": cont = por.refresh() sys.exit() if args.action == "refresh_status": if args.stack == "all": print("Stopping all stacks...") stcks = por.get_stacks(endpoint_id=args.endpoint_id) else: por.refresh_status(args.stack_id)