Files
portainer/portainer.py
2025-12-13 19:32:29 +01:00

648 lines
22 KiB
Python
Executable File

"""
Portainer API Client module.
This module provides a wrapper for interacting with the Portainer API
to manage endpoints, stacks, and containers.
"""
# !/myapps/venvs/portainer/bin/python3
import os
import logging
import signal
import sys
import json
import argparse
import tty
import termios
import hvac
from tabulate import tabulate
from port import Portainer
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.shortcuts import checkboxlist_dialog
from prompt_toolkit.shortcuts import radiolist_dialog
VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://192.168.77.101:8200")
try:
VAULT_TOKEN = os.environ.get("VAULT_TOKEN")
if VAULT_TOKEN is None:
raise KeyError
except KeyError:
VAULT_TOKEN = prompt("Valult root token : ", is_password=True)
os.environ["VAULT_TOKEN"] = VAULT_TOKEN
client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN)
# Check if connected
if client.is_authenticated():
print("Connected to Vault")
else:
raise Exception("Failed to authenticate with Vault")
# Specify the mount point of your KV engine
VERSION = "0.1.14"
defaults = {
"endpoint_id": "vm01",
"stack": "my_stack",
"deploy_mode": "git",
"autostart": "True",
"stack_mode": "swarm",
"site": "portainer",
}
cur_config = {}
def load_config(defaults=defaults):
'''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.'''
if os.path.exists("/myapps/portainer.conf"):
with open("/myapps/portainer.conf", "r") as f:
conf_data = f.read()
for line in conf_data.split("\n"):
if line.startswith("#") or line.strip() == "":
continue
key, value = line.split("=", 1)
os.environ[key.strip()] = value.strip()
cur_config[key.strip()] = value.strip()
else:
print("No /myapps/portainer.conf file found, proceeding with env vars.")
os.makedirs("/myapps", exist_ok=True)
for field in defaults.keys():
value_in = os.getenv(f"PORTAINER_{field.upper()}")
if value_in is not None:
os.environ[f"PORTAINER_{field.upper()}"] = value_in
cur_config[f"PORTAINER_{field.upper()}"] = value_in
else:
os.environ[f"PORTAINER_{field.upper()}"] = defaults[field]
cur_config[f"PORTAINER_{field.upper()}"] = defaults[field]
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
return cur_config
a = load_config(defaults)
# ENV_VARS = [
# "PORTAINER_URL",
# "PORTAINER_SITE",
# "PORTAINER_ENDPOINT_ID",
# "PORTAINER_STACK",
# "PORTAINER_DEPLOY_MODE",
# "PORTAINER_STACK_MODE",
# ]
def update_configs(cur_config):
'''Update defaults from environment variables if set.'''
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
# print(conf_data)
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
parser = argparse.ArgumentParser(
description="Portainer helper - use env vars or pass credentials."
)
parser.add_argument(
"--base",
"-b",
default=os.getenv("PORTAINER_URL", "https://portainer.example.com"),
help="Base URL for Portainer (ENV: PORTAINER_URL)",
)
parser.add_argument("--site", "-t", type=str, default=None, help="Site")
parser.add_argument(
"--endpoint-id",
"-e",
type=str,
default=None,
help="Endpoint ID to limit stack operations",
)
parser.add_argument(
"--service-id",
"-i",
type=str,
default=None,
help="Service ID to limit service operations",
)
parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations")
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
parser.add_argument(
"--autostart", "-Z", action="store_true", help="Auto-start created stacks"
)
parser.add_argument("--update", "-u", action="store_true", help="Update service if it exists")
parser.add_argument("--debug", "-D", action="store_true")
parser.add_argument("--gpu", "-g", action="store_true")
parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds")
parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode")
args = parser.parse_args()
print("Running version:", VERSION)
print("Environment:", args.site)
args.client = client
if args.site is not None:
cur_config["PORTAINER_SITE"] = args.site
if args.endpoint_id is not None:
cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id
if args.stack is not None:
cur_config["PORTAINER_STACK"] = args.stack
if args.deploy_mode is not None:
cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode
if args.stack_mode is not None:
cur_config["PORTAINER_STACK_MODE"] = args.stack_mode
update_configs(cur_config)
if args.debug:
input(cur_config)
_LOG_LEVEL = "INFO"
LOG_FILE = "/tmp/portainer.log"
if _LOG_LEVEL == "DEBUG":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.debug("using debug logging")
elif _LOG_LEVEL == "ERROR":
logging.basicConfig(
filename=LOG_FILE,
level=logging.ERROR,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using error logging")
elif _LOG_LEVEL == "SCAN":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using scan logging")
else:
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("script started")
logger = logging.getLogger(__name__)
def wl(msg):
"""Write log message if debug is enabled."""
if args.debug:
print(msg)
def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None):
"""
fields = [("arg_name", "Prompt text")]
"""
longest = 0
for field, text in fields:
a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")"
if len(a) > longest:
longest = len(a)
for field, text in fields:
# print(field)
value_in = getattr(args_in, field)
default = defaults_in.get(f"PORTAINER_{field}".upper())
cur_site = defaults_in.get("PORTAINER_SITE".upper())
cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper())
# print(value_in)
if value_in is None:
if default is not None:
prompt_text = f"{text} (default={default}) : "
# value_in = input(prompt) or default
if field == "site":
commands = ["portainer", "port"]
elif field == "deploy_mode":
commands = ["git", "upload"]
elif field == "stack_mode":
commands = ["swarm", "compose"]
elif field == "endpoint_id":
commands = por.endpoints_names
elif field == "stack":
if args.action == "create_stack":
# input(json.dumps(stacks, indent=2))
commands = [
'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana',
'hashicorp', 'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3',
'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx',
'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry',
'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress',
'wud', 'zabbix-server']
try:
print(por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys())
for s in por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys():
#print(s)
commands.remove(s)
except KeyError:
print("No stacks found for endpoint", defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
else:
commands = []
if por._debug:
input(por.stacks_all)
# print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
try:
for s in por.stacks_all[
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]
]["by_name"].keys():
commands.append(s)
except KeyError:
print(
"No stacks found for endpoint",
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()],
)
sys.exit(1)
else:
commands = []
completer = WordCompleter(
commands, ignore_case=True, match_middle=False
)
try:
if field == "stack":
commands.sort()
commands_tuples = [(cmd, cmd) for cmd in commands]
commands_tuples.insert(0, ("__ALL__", "[Select ALL]"))
value_in = checkboxlist_dialog(
title="Select Services",
text="Choose one or more services:",
values=commands_tuples,
).run()
if value_in is None:
print("Cancelled.")
sys.exit(0)
elif "__ALL__" in value_in:
# User selected "Select ALL"
value_in = commands # all real commands
value_in.sort()
if "pihole" in value_in:
if action == "delete_stack":
value_in.remove("pihole")
value_in.append("pihole")
else:
value_in.remove("pihole")
value_in.insert(0, "pihole")
print(" >> Stacks :", ",".join(value_in))
else:
value_in = (
prompt(
f" >> {prompt_text}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(prompt_text, default, longest+2)
else:
# value_in = input(f"{text}: ")
commands = ["start", "stop", "status", "restart", "reload", "exit"]
completer = WordCompleter(commands, ignore_case=True)
try:
value_in = (
prompt(
f" >> {text} {default}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(text, default, longest+2)
value_in.sort()
if por._debug:
print("Value entered:", value_in)
defaults_in[f"PORTAINER_{field}".upper()] = value_in
setattr(args, field, value_in)
if field == "site" and value_in != cur_site:
por.get_site(value_in)
if value_in == "portainer":
defaults_in["PORTAINER_ENDPOINT_ID"] = "m-s"
elif value_in == "port":
defaults_in["PORTAINER_ENDPOINT_ID"] = "vm01"
if field == "stack" and value_in != cur_site:
os.environ[field] = ",".join(value_in)
else:
os.environ[field] = value_in
if por._debug:
print(f"{defaults_in} {field} {value_in}")
if field == "endpoint_id" and value_in != defaults_in.get(
"PORTAINER_ENDPOINT_ID".upper()
):
print("refreshing environment")
por.get_endpoints()
with open("/myapps/portainer.conf", "w") as f:
for k in defaults_in.keys():
f.write(f"{k}={defaults_in[k]}\n")
return args
if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
# token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret")
def signal_handler(sig, frame):
logger.warning("Killed manually %s, %s", sig, frame)
print("\nTerminated by user")
print("\033[?25h", end="")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system("cls" if os.name == "nt" else "clear")
if args.action is None:
actions = [
("create_stack","create_stack"),
("delete_stack","delete_stack"),
("stop_stack","stop_stack"),
("start_stack","start_stack"),
("restart_service","restart_service"),
("update_service","update_service"),
("update_containers","update_containers"),
("list_stacks","list_stacks"),
("update_stack","update_stack"),
("secrets","secrets"),
("print_all_data","print_all_data"),
("list_endpoints","list_endpoints"),
("list_containers","list_containers"),
("stop_containers","stop_containers"),
("start_containers","start_containers"),
("refresh_environment","refresh_environment"),
("refresh_status","refresh_status"),
("update_status","update_status"),
]
selected_action = radiolist_dialog(
title="Select one service",
text="Choose a service:",
values=actions
).run()
print("Selected:", selected_action)
# print("Possible actions: \n")
# i = 1
# for a in actions:
# print(f" > {i:>2}. {a}")
# i += 1
# ans = input("\nSelect action to perform: ")
args.action = selected_action
os.system("cls" if os.name == "nt" else "clear")
# Example: list endpoints
por = Portainer(cur_config["PORTAINER_SITE"], args)
por.set_defaults(cur_config)
if args.debug:
por._debug = True
if args.action == "secrets":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
secrets = {
"gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb",
"influxdb2-admin-token": "l4c1j4yd33Du5lo",
"ha_influxdb2_admin_token": "l4c1j4yd33Du5lo",
"wordpress_db_password": "wordpress",
"wordpress_root_db_password": "wordpress",
}
for key, value in secrets.items():
res = por.create_secret(key, value, args.endpoint_id, args.timeout)
print(res)
sys.exit()
if args.action == "delete_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
action="delete_stack",
)
input(
f"\nDelete stack {','.join(args.stack)} on endpoint {args.endpoint_id}. Press ENTER to continue..."
)
por.delete_stack(
args.endpoint_id,
args.stack,
)
sys.exit()
if args.action == "create_stack":
por.action = "create_stack"
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
("stack_mode", "Stack mode (swarm or compose)"),
("deploy_mode", "Deploy mode (git or upload)"),
],
por,
)
por.create_stack(
args.endpoint_id,
args.stack,
args.deploy_mode,
args.autostart,
args.stack_mode,
)
sys.exit()
if args.action == "stop_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.stop_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "start_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.start_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "restart_service":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.restart_service(args.endpoint_id, "lala")
sys.exit()
if args.action == "update_service":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_service()
sys.exit()
if args.action == "update_containers":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_containers()
sys.exit()
if args.action == "list_stacks":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
por.print_stacks(args.endpoint_id)
# print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "list_containers":
print("Getting containers")
print(por.get_containers())
sys.exit()
if args.action == "update_stack":
print("Updating stacks")
por.update_stack(args.endpoint_id, args.stack, args.autostart)
sys.exit()
if args.action == "print_all_data":
print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "update_status":
por.update_status(args.endpoint_id, args.stack)
sys.exit()
if args.action == "list_endpoints":
eps = por.get_endpoints(args)
export_data = []
for i in eps["by_id"]:
export_data.append([i, eps["by_id"][i]])
headers = ["EndpointId", "Name"]
print(tabulate(export_data, headers=headers, tablefmt="github"))
sys.exit()
if args.action == "stop_containers":
if por.all_data["endpoints_status"][args.endpoint_id] != 1:
print(f"Endpoint {por.get_endpoint_name(args.endpoint_id)} is offline")
sys.exit()
print(f"Stopping containers on {por.get_endpoint_name(args.endpoint_id)}")
cont = []
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.stop_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data, indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data,indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "refresh_environment":
cont = por.refresh()
sys.exit()
if args.action == "refresh_status":
if args.stack == "all":
print("Stopping all stacks...")
stcks = por.get_stacks(endpoint_id=args.endpoint_id)
else:
por.refresh_status(args.stack_id)