Files
portainer/portainer.py
2025-12-04 23:24:11 +01:00

531 lines
18 KiB
Python
Executable File

"""
Portainer API Client module.
This module provides a wrapper for interacting with the Portainer API
to manage endpoints, stacks, and containers.
"""
# !/myapps/venvs/portainer/bin/python3
import os
import logging
import signal
import sys
import json
import argparse
import tty
import termios
from tabulate import tabulate
from port import Portainer
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
VERSION = "0.1.0"
defaults = {
"endpoint_id": "vm01",
"stack": "my_stack",
"deploy_mode": "git",
"autostart": "True",
"stack_mode": "swarm",
"site": "portainer",
}
cur_config = {}
def load_config(defaults=defaults):
'''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.'''
if os.path.exists("/myapps/portainer.conf"):
with open("/myapps/portainer.conf", "r") as f:
conf_data = f.read()
for line in conf_data.split("\n"):
if line.startswith("#") or line.strip() == "":
continue
key, value = line.split("=", 1)
os.environ[key.strip()] = value.strip()
cur_config[key.strip()] = value.strip()
else:
print("No /myapps/portainer.conf file found, proceeding with env vars.")
os.makedirs("/myapps", exist_ok=True)
for field in defaults.keys():
value_in = os.getenv(f"PORTAINER_{field.upper()}")
if value_in is not None:
os.environ[f"PORTAINER_{field.upper()}"] = value_in
cur_config[f"PORTAINER_{field.upper()}"] = value_in
else:
os.environ[f"PORTAINER_{field.upper()}"] = defaults[field]
cur_config[f"PORTAINER_{field.upper()}"] = defaults[field]
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
return cur_config
cur_config = load_config(defaults)
# ENV_VARS = [
# "PORTAINER_URL",
# "PORTAINER_SITE",
# "PORTAINER_ENDPOINT_ID",
# "PORTAINER_STACK",
# "PORTAINER_DEPLOY_MODE",
# "PORTAINER_STACK_MODE",
# ]
def update_configs(cur_config):
'''Update defaults from environment variables if set.'''
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
# print(conf_data)
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
parser = argparse.ArgumentParser(
description="Portainer helper - use env vars or pass credentials."
)
parser.add_argument(
"--base",
"-b",
default=os.getenv("PORTAINER_URL", "https://portainer.example.com"),
help="Base URL for Portainer (ENV: PORTAINER_URL)",
)
parser.add_argument("--site", "-t", type=str, default=None, help="Site")
parser.add_argument(
"--endpoint-id",
"-e",
type=str,
default=None,
help="Endpoint ID to limit stack operations",
)
parser.add_argument(
"--refresh-environment", "-R", action="store_true", help="List endpoints"
)
parser.add_argument(
"--list-endpoints", "-E", action="store_true", help="List endpoints"
)
parser.add_argument("--list-stacks", "-l", action="store_true", help="List stacks")
parser.add_argument("--print-all-data", "-A", action="store_true", help="List stacks")
parser.add_argument(
"--list-containers", "-c", action="store_true", help="List containers"
)
parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacks")
parser.add_argument(
"--stop-containers", "-O", action="store_true", help="Stop containers"
)
parser.add_argument(
"--start-containers", "-X", action="store_true", help="Start containers"
)
parser.add_argument("--update-status", "-S", action="store_true", help="Update status")
parser.add_argument(
"--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id"
)
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
parser.add_argument(
"--autostart", "-Z", action="store_true", help="Auto-start created stacks"
)
parser.add_argument("--start-stack", "-x", action="store_true")
parser.add_argument("--stop-stack", "-o", action="store_true")
parser.add_argument("--secrets", "-q", action="store_true")
parser.add_argument("--debug", "-D", action="store_true")
parser.add_argument("--create-stack", "-n", action="store_true")
parser.add_argument("--create-stack_new2", "-N", action="store_true")
parser.add_argument("--gpu", "-g", action="store_true")
parser.add_argument("--create-stacks", "-C", action="store_true")
parser.add_argument("--refresh-status", "-r", action="store_true")
parser.add_argument("--stack", "-s", type=str, help="Stack ID for operations")
parser.add_argument(
"--token-only", action="store_true", help="Print auth token and exit"
)
parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds")
parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode")
args = parser.parse_args()
print("Running version:", VERSION)
print("Environment:", args.site)
if args.site is not None:
cur_config["PORTAINER_SITE"] = args.site
if args.endpoint_id is not None:
cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id
if args.stack is not None:
cur_config["PORTAINER_STACK"] = args.stack
if args.deploy_mode is not None:
cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode
if args.stack_mode is not None:
cur_config["PORTAINER_STACK_MODE"] = args.stack_mode
update_configs(cur_config)
if args.debug:
input(cur_config)
_LOG_LEVEL = "INFO"
LOG_FILE = "/tmp/portainer.log"
if _LOG_LEVEL == "DEBUG":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.debug("using debug logging")
elif _LOG_LEVEL == "ERROR":
logging.basicConfig(
filename=LOG_FILE,
level=logging.ERROR,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using error logging")
elif _LOG_LEVEL == "SCAN":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using scan logging")
else:
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("script started")
logger = logging.getLogger(__name__)
def wl(msg):
"""Write log message if debug is enabled."""
if args.debug:
print(msg)
def prompt_missing_args(args_in, defaults_in, fields):
"""
fields = [("arg_name", "Prompt text")]
"""
longest = 0
for field, text in fields:
a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")"
if len(a) > longest:
longest = len(a)
for field, text in fields:
value_in = getattr(args_in, field)
default = defaults_in.get(f"PORTAINER_{field}".upper())
cur_site = defaults_in.get("PORTAINER_SITE".upper())
if value_in is None:
if default is not None:
prompt_text = f"{text} (default={default}) : "
# value_in = input(prompt) or default
if field == "site":
commands = ["portainer", "port"]
elif field == "deploy_mode":
commands = ["git", "upload"]
elif field == "stack_mode":
commands = ["swarm", "compose"]
elif field == "endpoint_id":
commands = por.endpoints_names
elif field == "stack":
if args.action == "create_stack":
commands = [
'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana',
'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3',
'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nextcloud', 'nginx',
'node-red', 'octoprint', 'ollama', 'pihole', 'portainer-ce', 'rancher', 'registry',
'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub',
'wud', 'zabbix-server']
else:
commands = []
if por._debug:
input(por.stacks_all)
# print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
try:
for s in por.stacks_all[
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]
]["by_name"].keys():
commands.append(s)
except KeyError:
print(
"No stacks found for endpoint",
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()],
)
sys.exit(1)
else:
commands = []
completer = WordCompleter(
commands, ignore_case=True, match_middle=False
)
try:
value_in = (
prompt(
f" >> {prompt_text}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(prompt_text, default, longest+2)
else:
# value_in = input(f"{text}: ")
commands = ["start", "stop", "status", "restart", "reload", "exit"]
completer = WordCompleter(commands, ignore_case=True)
try:
value_in = (
prompt(
f" >> {text} {default}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(text, default, longest+2)
if por._debug:
print("Value entered:", value_in)
defaults_in[f"PORTAINER_{field}".upper()] = value_in
setattr(args, field, value_in)
os.environ[field] = value_in
if field == "site" and value_in != cur_site:
por.get_site(value_in)
if por._debug:
print(f"{defaults_in} {field} {value_in}")
if field == "endpoint_id" and value_in != defaults_in.get(
"PORTAINER_ENDPOINT_ID".upper()
):
print("refreshing environment")
por.get_endpoints()
with open("/myapps/portainer.conf", "w") as f:
for k in defaults_in.keys():
f.write(f"{k}={defaults_in[k]}\n")
return args
if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
# token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret")
def signal_handler(sig, frame):
logger.warning("Killed manually %s, %s", sig, frame)
print("\nTerminated by user")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system("cls" if os.name == "nt" else "clear")
if args.action is None:
actions = [
"create_stack",
"delete_stack",
"stop_stack",
"start_stack",
"list_stacks",
"update_stack",
"secrets",
"print_all_data",
"list_endpoints",
"list_containers",
"stop_containers",
"start_containers",
"refresh_environment",
"refresh_status",
"update_status",
]
print("Possible actions: \n")
i = 1
for a in actions:
print(f" > {i:>2}. {a}")
i += 1
ans = input("\nSelect action to perform: ")
args.action = actions[int(ans) - 1]
os.system("cls" if os.name == "nt" else "clear")
# Example: list endpoints
por = Portainer(cur_config["PORTAINER_SITE"], timeout=args.timeout)
por.set_defaults(cur_config)
if args.debug:
por._debug = True
if args.action == "secrets":
if args.endpoint_id is None:
args.endpoint_id = input("Endpoint ID is required for creating secrets : ")
secrets = {
"gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb",
"influxdb2-admin-token": "l4c1j4yd33Du5lo",
"ha_influxdb2_admin_token": "l4c1j4yd33Du5lo",
"wordpress_db_password": "wordpress",
"wordpress_root_db_password": "wordpress",
}
for key, value in secrets.items():
res = por.create_secret(key, value, args.endpoint_id, args.timeout)
print(res)
sys.exit()
if args.action == "delete_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
input(
f"Delete stack {args.stack} on endpoint {args.endpoint_id}. Press ENTER to continue..."
)
por.delete_stack(
args.endpoint_id,
args.stack,
)
sys.exit()
if args.action == "create_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
("stack_mode", "Stack mode (swarm or compose)"),
("deploy_mode", "Deploy mode (git or upload)"),
],
)
por.create_stack(
args.endpoint_id,
args.stack,
args.deploy_mode,
args.autostart,
args.stack_mode,
)
sys.exit()
if args.action == "stop_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.stop_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "start_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.start_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "list_stacks":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
por.print_stacks(args.endpoint_id)
# print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "list_containers":
print("Getting containers")
por.get_containers(args.endpoint_id, args.stack)
sys.exit()
if args.action == "update_stack":
print("Updating stacks")
por.update_stack(args.endpoint_id, args.stack, args.autostart)
sys.exit()
if args.action == "print_all_data":
print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "update_status":
por.update_status(args.endpoint_id, args.stack)
sys.exit()
if args.action == "list_endpoints":
eps = por.get_endpoints()
export_data = []
for i in eps["by_id"]:
export_data.append([i, eps["by_id"][i]])
headers = ["EndpointId", "Name"]
print(tabulate(export_data, headers=headers, tablefmt="github"))
sys.exit()
if args.action == "stop_containers":
if por.all_data["endpoints_status"][args.endpoint_id] != 1:
print(f"Endpoint {por.get_endpoint_name(args.endpoint_id)} is offline")
sys.exit()
print(f"Stopping containers on {por.get_endpoint_name(args.endpoint_id)}")
cont = []
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.stop_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data, indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data,indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "refresh_environment":
cont = por.refresh()
sys.exit()
if args.action == "refresh_status":
if args.stack == "all":
print("Stopping all stacks...")
stcks = por.get_stacks(endpoint_id=args.endpoint_id)
else:
por.refresh_status(args.stack_id)