Files
portainer/main.py
2026-01-24 21:03:32 +01:00

681 lines
23 KiB
Python
Executable File

"""
Portainer API Client module.
This module provides a wrapper for interacting with the Portainer API
to manage endpoints, stacks, and containers.
"""
import os
import logging
import signal
import sys
import json
import argparse
import hvac
import time
import base64
import shutil
import requests
from portainer.api import PortainerApi
from git import Repo
from concurrent.futures import ThreadPoolExecutor
from tabulate import tabulate
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.shortcuts import checkboxlist_dialog
from prompt_toolkit.shortcuts import radiolist_dialog
# VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://192.168.77.101:8200")
VAULT_ADDR = os.environ.get("VAULT_ADDR", "https://vault.sectorq.eu")
try:
VAULT_TOKEN = os.environ.get("VAULT_TOKEN")
if VAULT_TOKEN is None:
raise KeyError
except KeyError:
VAULT_TOKEN = prompt("Valult root token : ", is_password=True)
os.environ["VAULT_TOKEN"] = VAULT_TOKEN
client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN)
# Check if connected
if client.is_authenticated():
print("Connected to Vault")
else:
raise Exception("Failed to authenticate with Vault")
# Specify the mount point of your KV engine
VERSION = "0.1.55"
defaults = {
"endpoint_id": "vm01",
"stack": "my_stack",
"deploy_mode": "git",
"autostart": "True",
"stack_mode": "swarm",
"site": "portainer",
}
cur_config = {}
def load_config(defaults=defaults):
'''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.'''
if os.path.exists("/myapps/portainer.conf"):
with open("/myapps/portainer.conf", "r") as f:
conf_data = f.read()
for line in conf_data.split("\n"):
if line.startswith("#") or line.strip() == "":
continue
key, value = line.split("=", 1)
os.environ[key.strip()] = value.strip()
cur_config[key.strip()] = value.strip()
else:
print("No /myapps/portainer.conf file found, proceeding with env vars.")
os.makedirs("/myapps", exist_ok=True)
for field in defaults.keys():
value_in = os.getenv(f"PORTAINER_{field.upper()}")
if value_in is not None:
os.environ[f"PORTAINER_{field.upper()}"] = value_in
cur_config[f"PORTAINER_{field.upper()}"] = value_in
else:
os.environ[f"PORTAINER_{field.upper()}"] = defaults[field]
cur_config[f"PORTAINER_{field.upper()}"] = defaults[field]
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
return cur_config
a = load_config(defaults)
# ENV_VARS = [
# "PORTAINER_URL",
# "PORTAINER_SITE",
# "PORTAINER_ENDPOINT_ID",
# "PORTAINER_STACK",
# "PORTAINER_DEPLOY_MODE",
# "PORTAINER_STACK_MODE",
# ]
def update_configs(cur_config):
'''Update defaults from environment variables if set.'''
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
# print(conf_data)
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
parser = argparse.ArgumentParser(
description=f"""\
Portainer helper - use env vars or pass credentials."
version: {VERSION}
""",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--base",
"-b",
default=os.getenv("PORTAINER_URL", "https://portainer.example.com"),
help="Base URL for Portainer (ENV: PORTAINER_URL)",
)
parser.add_argument("--site", "-t", type=str, default=None, help="Site")
parser.add_argument(
"--endpoint-id",
"-e",
type=str,
default=None,
help="Endpoint ID to limit stack operations",
)
parser.add_argument(
"--service-id",
"-i",
type=str,
default=None,
help="Service ID to limit service operations",
)
parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations")
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
parser.add_argument(
"--autostart", "-Z", action="store_true", help="Auto-start created stacks"
)
parser.add_argument("--update", "-u", action="store_true", help="Update service if it exists")
parser.add_argument("--debug", "-D", action="store_true")
parser.add_argument("--launcher", "-L", action="store_true")
parser.add_argument("--gpu", "-g", action="store_true")
parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds")
parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode")
args = parser.parse_args()
print("Running version:", VERSION)
print("Environment:", args.site)
args.client = client
if args.site is not None:
cur_config["PORTAINER_SITE"] = args.site
if args.endpoint_id is not None:
cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id
if args.stack is not None:
cur_config["PORTAINER_STACK"] = args.stack
if args.deploy_mode is not None:
cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode
if args.stack_mode is not None:
cur_config["PORTAINER_STACK_MODE"] = args.stack_mode
update_configs(cur_config)
if args.debug:
input(cur_config)
_LOG_LEVEL = "DEBUG"
LOG_FILE = "/tmp/portainer.log"
if _LOG_LEVEL == "DEBUG":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.debug("using debug logging")
elif _LOG_LEVEL == "ERROR":
logging.basicConfig(
filename=LOG_FILE,
level=logging.ERROR,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using error logging")
elif _LOG_LEVEL == "SCAN":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using scan logging")
else:
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("script started")
logger = logging.getLogger(__name__)
def wl(msg):
"""Write log message if debug is enabled."""
if args.debug:
print(msg)
def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None):
"""
fields = [("arg_name", "Prompt text")]
"""
longest = 0
for field, text in fields:
a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")"
if len(a) > longest:
longest = len(a)
for field, text in fields:
# print(field)
value_in = getattr(args_in, field)
default = defaults_in.get(f"PORTAINER_{field}".upper())
cur_site = defaults_in.get("PORTAINER_SITE".upper())
cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper())
# print(value_in)
if value_in is None:
if default is not None:
prompt_text = f"{text} (default={default}) : "
# value_in = input(prompt) or default
if field == "site":
commands = ["portainer", "port"]
elif field == "deploy_mode":
commands = ["git", "upload"]
elif field == "stack_mode":
commands = ["swarm", "compose"]
elif field == "endpoint_id":
commands = por.endpoints_names
elif field == "stack":
if args.action == "create_stack":
# input(json.dumps(stacks, indent=2))
commands = [
'authentik', 'bitwarden', 'bookstack', 'dockermon', 'duplicati', 'fail2ban', 'gitea', 'gitlab', 'grafana', 'grocy',
'hashicorp', 'home-assistant', 'homebox','homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'kopia', 'mailu3',
'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx',
'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry',
'regsync', 'searxng','semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress',
'wud', 'zabbix-server']
try:
print(por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys())
for s in por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys():
#print(s)
commands.remove(s)
except KeyError:
print("No stacks found for endpoint", defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
else:
commands = []
if por._debug:
input(por.stacks_all)
# print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
try:
for s in por.stacks_all[
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]
]["by_name"].keys():
commands.append(s)
except KeyError:
print(
"No stacks found for endpoint",
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()],
)
sys.exit(1)
else:
commands = []
completer = WordCompleter(
commands, ignore_case=True, match_middle=False
)
try:
if field == "stack":
commands.sort()
commands_tuples = [(cmd, cmd) for cmd in commands]
commands_tuples.insert(0, ("__ALL__", "[Select ALL]"))
value_in = checkboxlist_dialog(
title="Select Services",
text="Choose one or more services:",
values=commands_tuples,
).run()
if value_in is None:
print("Cancelled.")
sys.exit(0)
elif "__ALL__" in value_in:
# User selected "Select ALL"
value_in = commands # all real commands
value_in.sort()
if "pihole" in value_in:
if action == "delete_stack":
value_in.remove("pihole")
value_in.append("pihole")
else:
value_in.remove("pihole")
value_in.insert(0, "pihole")
print(" >> Stacks :", ",".join(value_in))
else:
value_in = (
prompt(
f" >> {prompt_text}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(prompt_text, default, longest+2)
else:
# value_in = input(f"{text}: ")
commands = ["start", "stop", "status", "restart", "reload", "exit"]
completer = WordCompleter(commands, ignore_case=True)
try:
value_in = (
prompt(
f" >> {text} {default}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(text, default, longest+2)
value_in.sort()
if por._debug:
print("Value entered:", value_in)
defaults_in[f"PORTAINER_{field}".upper()] = value_in
setattr(args, field, value_in)
if field == "site" and value_in != cur_site:
por.get_site(value_in)
if value_in == "portainer":
defaults_in["PORTAINER_ENDPOINT_ID"] = "m-s"
elif value_in == "port":
defaults_in["PORTAINER_ENDPOINT_ID"] = "vm01"
if field == "stack" and value_in != cur_site:
os.environ[field] = ",".join(value_in)
else:
os.environ[field] = value_in
if por._debug:
print(f"{defaults_in} {field} {value_in}")
if field == "endpoint_id" and value_in != defaults_in.get(
"PORTAINER_ENDPOINT_ID".upper()
):
print("refreshing environment")
por.get_endpoints()
with open("/myapps/portainer.conf", "w") as f:
for k in defaults_in.keys():
f.write(f"{k}={defaults_in[k]}\n")
return args
if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
def signal_handler(sig, frame):
logger.warning("Killed manually %s, %s", sig, frame)
print("\nTerminated by user")
print("\033[?25h", end="")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system("cls" if os.name == "nt" else "clear")
if args.action is None:
actions = [
("create_stack","create_stack"),
("delete_stack","delete_stack"),
("stop_stack","stop_stack"),
("start_stack","start_stack"),
("restart_service","restart_service"),
("update_service","update_service"),
("update_containers","update_containers"),
("list_stacks","list_stacks"),
("update_stack","update_stack"),
("secrets","secrets"),
("print_all_data","print_all_data"),
("list_endpoints","list_endpoints"),
("list_containers","list_containers"),
("stop_containers","stop_containers"),
("start_containers","start_containers"),
("refresh_environment","refresh_environment"),
("refresh_status","refresh_status"),
("update_status","update_status"),
]
selected_action = radiolist_dialog(
title=f"Select one service - version: {VERSION}",
text="Choose a service:",
values=actions
).run()
print("Selected:", selected_action)
# print("Possible actions: \n")
# i = 1
# for a in actions:
# print(f" > {i:>2}. {a}")
# i += 1
# ans = input("\nSelect action to perform: ")
args.action = selected_action
os.system("cls" if os.name == "nt" else "clear")
# Example: list endpoints
por = PortainerApi(cur_config["PORTAINER_SITE"], args)
por.set_defaults(cur_config)
if args.debug:
por._debug = True
if args.action == "secrets":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
secrets = {
"gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb",
"influxdb2-admin-token": "l4c1j4yd33Du5lo",
"ha_influxdb2_admin_token": "l4c1j4yd33Du5lo",
"wordpress_db_password": "wordpress",
"wordpress_root_db_password": "wordpress",
}
for key, value in secrets.items():
res = por.create_secret(key, value, args.endpoint_id, args.timeout)
print(res)
sys.exit()
if args.action == "delete_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
action="delete_stack",
)
input(
f"\nDelete stack {','.join(args.stack)} on endpoint {args.endpoint_id}. Press ENTER to continue..."
)
por.delete_stack(
args.endpoint_id,
args.stack,
)
sys.exit()
if args.action == "create_stack":
por.action = "create_stack"
#print(cur_config)
#print(args)
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
("stack_mode", "Stack mode (swarm or compose)"),
("deploy_mode", "Deploy mode (git or upload)"),
],
por,
)
por.create_stack(
args.endpoint_id,
args.stack,
args.deploy_mode,
args.autostart,
args.stack_mode,
)
sys.exit()
if args.action == "stop_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.stop_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "start_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.start_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "restart_service":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.restart_service(args.endpoint_id, "lala")
sys.exit()
if args.action == "update_service":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_service()
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "update_containers":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_containers()
sys.exit()
if args.action == "list_stacks":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
por.print_stacks(args)
if args.launcher:
input("Press ENTER to continue...")
# print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "list_containers":
print("Getting containers")
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
print("\n".join(por.get_containers()))
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "update_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_stack(args)
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "print_all_data":
print(json.dumps(por.all_data, indent=2))
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "update_status":
por.update_status(args.endpoint_id, args.stack)
sys.exit()
if args.action == "list_endpoints":
eps = por.get_endpoints(args)
export_data = []
for i in eps["by_id"]:
export_data.append([i, eps["by_id"][i]])
headers = ["EndpointId", "Name"]
print(tabulate(export_data, headers=headers, tablefmt="github"))
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "stop_containers":
# TODO: does not work
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
if por.all_data["endpoints_status"][args.endpoint_id] != 1:
print(f"Endpoint {por.get_endpoint_name(args.endpoint_id)} is offline")
sys.exit()
print(f"Stopping containers on {por.get_endpoint_name(args.endpoint_id)}")
cont = []
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.stop_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data, indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data,indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "refresh_environment":
cont = por.refresh()
sys.exit()
if args.action == "refresh_status":
por.refresh_status(args)