Files
portainer/portainer.py
2025-12-23 22:19:02 +01:00

2025 lines
77 KiB
Python
Executable File

"""
Portainer API Client module.
This module provides a wrapper for interacting with the Portainer API
to manage endpoints, stacks, and containers.
"""
import os
import logging
import signal
import sys
import json
import argparse
import hvac
from tabulate import tabulate
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.shortcuts import checkboxlist_dialog
from prompt_toolkit.shortcuts import radiolist_dialog
class PortainerApi:
"""
Simple wrapper around the module-level Portainer helper functions.
Instantiate with base_url and optional token/timeout and call methods
to perform API operations.
"""
def __init__(self, site, args=None, timeout=120):
self.base_url = None
self.token = None
self.args = args
self.action = None
self._debug = False
self.timeout = timeout
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.stack_name = None
self.stacks_all = {}
self.stack_id = None
self.stack_ids = []
self.endpoint_name = None
self.endpoint_id = args.endpoint_id
# self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git"
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.repo_dir = "/tmp/docker-compose"
self.basic_stacks = [
"pihole",
"nginx",
"mosquitto",
"webhub",
"authentik",
"bitwarden",
"mailu3",
"home-assistant",
"homepage",
]
self.nas_stacks = self.basic_stacks + [
"gitlab",
"bookstack",
"dockermon",
"gitea",
"grafana",
"immich",
"jupyter",
"kestra",
"mealie",
]
self.m_server_stacks = self.basic_stacks + [
"immich",
"zabbix-server",
"gitea",
"unifibrowser",
"mediacenter",
"watchtower",
"wazuh",
"octoprint",
"motioneye",
"kestra",
"bookstack",
"wud",
"uptime-kuma",
"registry",
"regsync",
"dockermon",
"grafana",
"nextcloud",
"semaphore",
"node-red",
"test",
"jupyter",
"paperless",
"mealie",
"n8n",
"ollama",
"rancher",
]
self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"]
self.rack_stacks = self.basic_stacks + [
"gitlab",
"bookstack",
"dockermon",
"gitea",
"grafana",
"immich",
"jupyter",
"kestra",
"mealie",
]
self.log_mode = False
self.hw_mode = False
self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}, "services":{}}
self.get_site(site)
self.get_endpoints()
self.get_stacks()
self.refresh_in_containers()
def set_defaults(self, config):
'''Set default configuration from provided config dictionary.'''
self.cur_config = config
def get_site(self, site):
if site == "portainer":
self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
)
# self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
token_path = "portainer/token"
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
elif site == "port":
self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api")
token_path = "port/token"
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
else:
self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
)
self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
self.get_endpoints()
self.get_stacks()
def _is_number(self, s):
"""Check if the input string is a number."""
try:
float(s)
return True
except ValueError:
return False
def gotify_message(self, message):
payload = {
"title": "Updates in Portainer",
"message": message,
"priority": 5
}
'''Send a notification message via Gotify.'''
response = requests.post(
"https://gotify.sectorq.eu/message",
data=payload,
headers={"X-Gotify-Key": "ASn_fIAd5OVjm8c"}
)
logger.debug(response.text)
# print("Status:", response.status_code)
# print("Response:", response.text)
pass
def _api_get(self, path, timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.get(url, headers=headers, timeout=timeout)
if resp.status_code != 200:
return resp.status_code
print(f"Error: {resp.status_code} - {resp.text}")
# resp.raise_for_status()
return resp.json()
def _api_post(self, path, json="", timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
# print(url)
# print(json)
resp = requests.post(url, headers=headers, json=json, timeout=timeout)
return resp.text
def _api_put(self, path, json="", timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
# print(url)
# print(json)
resp = requests.put(url, headers=headers, json=json, timeout=timeout)
return resp.text
def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
# input("API POST2 called. Press Enter to continue.")
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)}
# print(data)
resp = requests.post(
url, headers=headers, files=file, data=data, timeout=timeout
)
resp.raise_for_status()
return resp.json()
def _api_post_no_body(self, path, timeout=120):
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
# print(url)
headers = {"X-API-Key": f"{self.token}"}
resp = requests.post(url, headers=headers, timeout=timeout)
return resp.text
def _api_delete(self, path, timeout=120):
"""Example authenticated DELETE request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.delete(url, headers=headers, timeout=timeout)
# print(resp)
resp.raise_for_status()
# print(resp.status_code)
return resp.status_code
def refresh(self):
'''Refresh all data from Portainer.'''
self.get_endpoints()
self.get_stacks(self)
self.get_containers(self)
return True
def get_stacks(self, endpoint_id="all", timeout=20):
'''Get a list of stacks for a specific endpoint or all endpoints.'''
if endpoint_id != "all":
endpoint_id = self.get_endpoint_id()
path = "/stacks"
stcks = []
stacks = self._api_get(path, timeout=timeout)
self.stacks_all = {}
fail_endponts = [20, 39, 41]
# print(json.dumps(stacks,indent=2))
webhooks = {}
for s in stacks:
# print(type(s["AutoUpdate"]) )
# input(s)
if s["EndpointId"] in fail_endponts:
continue
if not s["EndpointId"] in webhooks:
try:
webhooks[s["EndpointId"]] = {"webhook": {}}
webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}}
except Exception as e:
logger.debug(
f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}"
)
if not s["EndpointId"] in self.stacks_all:
self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}}
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = {
"by_id": {},
"by_name": {},
}
self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"]
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][
s["Id"]
] = s["Name"]
self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"]
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][
s["Name"]
] = s["Id"]
# print(s)
if "AutoUpdate" in s and s["AutoUpdate"] is not None:
if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]:
# print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
# print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"]
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"AutoUpdate"
]["Webhook"]
elif s["AutoUpdate"]["Webhook"] != "":
webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"]
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"Webhook"
]
# print(self.stacks_all)
if s["EndpointId"] == endpoint_id or endpoint_id == "all":
stcks.append(s)
# print(stcks)
if stcks is None:
return []
self.stacks = stacks
self.all_data["stacks"] = self.stacks_all
self.all_data["webhooks"] = webhooks
# input(json.dumps(self.stacks_all,indent=2))
return stcks
def get_services(self, endpoint, timeout=30):
'''Get a list of services for a specific stack on an endpoint.'''
# print(json.dumps(self.all_data,indent=2))
path = f"/endpoints/{self.get_endpoint_id()}/docker/services"
# print(path)
# path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}'
services = self._api_get(path, timeout=timeout)
return services
def update_status(self, endpoint, stack):
'''Get the update status of a specific stack on an endpoint.'''
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
# input(path)
stats = self._api_get(path)
print(stats)
def get_endpoint_id(self):
'''Get endpoint ID from either ID or name input.'''
# input(self.args.endpoint_id)
if self._is_number(self.args.endpoint_id):
self.endpoint_id = self.args.endpoint_id
self.endpoint_name = self.endpoints["by_id"][self.args.endpoint_id]
return self.args.endpoint_id
else:
self.endpoint_name = self.args.endpoint_id
self.endpoint_id = self.endpoints["by_name"][self.args.endpoint_id]
return self.endpoints["by_name"][self.args.endpoint_id]
def get_endpoint_name(self, endpoint):
'''Get endpoint name from either ID or name input.'''
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.all_data["endpoints"]["by_id"][endpoint]
return self.all_data["endpoints"]["by_id"][endpoint]
else:
self.endpoint_name = endpoint
self.endpoint_id = self.all_data["endpoints"]["by_name"][endpoint]
return endpoint
def refresh_in_containers(self):
'''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()]
#input(eps)
for endpoint in eps:
if self.all_data["endpoints_status"][endpoint] != 1:
print("Endpoint down")
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
path = (
f"/endpoints/{endpoint}/docker/containers/json?all=1"
)
logging.info(f"request : {path}")
try:
containers = self._api_get(path)
#input(json.dumps(containers, indent=2))
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
#input(c)
cont.append([c["Names"][0].replace("/", ""),c["Id"], c['Image']])
contr.append([c["Names"][0].replace("/", ""), c["Id"], c['Image']])
if self.all_data["endpoints"]["by_id"][endpoint] in data:
data[self.all_data["endpoints"]["by_id"][endpoint]] = contr
data[endpoint] = contr
else:
data[self.all_data["endpoints"]["by_id"][endpoint]] = contr
data[endpoint] = contr
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}",
)
self.all_data["containers"] = data
#print(cont)
return cont
def get_containers(self):
'''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
if self.args.endpoint_id == "all":
eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()]
else:
eps = [self.get_endpoint_id()]
for endpoint in eps:
#print(self.args.stack)
if self.args.stack in ["all", None]:
# input([id for id in self.all_data["stacks"][endpoint]['by_id'].keys()])
for e in [id for id in self.all_data["stacks"][endpoint]['by_name'].keys()]:
#input(e)
# if s not in self.all_data["stacks"]:
# continue
#input(self.all_data)
if self.all_data["endpoints_status"][endpoint] != 1:
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
# input(self.all_data["stacks"][endpoint]["by_name"])
#input(e)
path = (
f"/endpoints/{endpoint}/docker/containers/json"
f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
)
logging.info(f"request : {path}")
try:
containers = self._api_get(path)
#input(containers)
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
# input(c)
cont.append(c["Names"][0].replace("/", ""))
contr.append(c["Names"][0].replace("/", ""))
if self.all_data["endpoints"]["by_id"][endpoint] in data:
data[self.all_data["endpoints"]["by_id"][endpoint]][e] = contr
else:
data[self.all_data["endpoints"]["by_id"][endpoint]] = {
e: contr
}
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}",
)
self.all_data["containers"] = data
#print(cont)
return cont
def stop_containers(self, endpoint, containers, timeout=130):
'''Stop containers on an endpoint.'''
if self.all_data["endpoints_status"][endpoint] != 1:
print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline")
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Stopping {c}")
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop")
# print(f"✔")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
# for c in containers:
# print(f" > Stopping {c}")
# self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
# return 0
def start_containers(self, endpoint, containers, timeout=130):
'''Start containers on an endpoint.'''
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Starting {c}")
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
def update_stack(self, args):
'''Update one stack or all stacks on an endpoint.'''
#print("Updating stacks")
stacks = self.get_stacks(endpoint_id=args.endpoint_id)
stacks_tuples = []
for s in stacks:
#print(s)
try:
stacks_tuples.append((s['AutoUpdate']['Webhook'],s['Name']))
# print(s['Name'], " : ", s['AutoUpdate']['Webhook'])
except:
stacks_tuples.append((s['Webhook'],s['Name']))
# print(s['Name'], " : ", s['Webhook'])
stacks_dict = dict(stacks_tuples)
print(stacks_dict)
#input(stacks_tuples)
# stacks_tuples = [(s['AutoUpdate']['Webhook'], s['Name']) for s in stacks if "Webhook" in s['AutoUpdate'] ]
def update(c):
print(f" > Updating {c[1]} ")
ans = self._api_post_no_body(f"/stacks/webhooks/{c[0]}")
logger.debug(
f"Update response for stack {c[0]} on endpoint {ans}"
)
# input(stacks_tuples)
if args.debug:
input(args)
stacks_tuples = sorted(stacks_tuples, key=lambda x: x[1])
stack_dict = dict(stacks_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
stacks_tuples.insert(0, ("__ALL__", "[Select ALL]"))
stack_ids = checkboxlist_dialog(
title="Select one stack to update",
text="Choose a service:",
values=stacks_tuples
).run()
stcs = []
input(stack_ids)
if args.stack == "all":
for s in stack_dict:
stcs.append([s, stack_dict[s]])
else:
for s in stack_dict:
if s in stack_ids:
stcs.append([s, stack_dict[s]])
print(stcs)
with ThreadPoolExecutor(max_workers=10) as exe:
list(exe.map(update, stcs))
input('UPDATED')
if not args.autostart:
time.sleep(120)
cont = []
for c in self.all_data["containers"][args.endpoint_id]:
if args.stack == c or args.stack == "all":
cont += self.all_data["containers"][args.endpoint_id][c]
self.stop_containers(args.endpoint_id, cont)
def get_endpoints(self, timeout=10):
'''Get a list of all endpoints.'''
endpoints = self._api_get("/endpoints")
eps = {"by_id": {}, "by_name": {}}
eps_stats = {}
for ep in endpoints:
eps["by_id"][ep["Id"]] = ep["Name"]
eps["by_name"][ep["Name"]] = ep["Id"]
eps_stats[ep["Id"]] = ep["Status"]
eps_stats[ep["Name"]] = ep["Status"]
self.endpoints = eps
self.endpoints_names = list(eps["by_name"])
self.all_data["endpoints"] = eps
self.all_data["endpoints_status"] = eps_stats
# input(eps_stats)
# input(eps)
return eps
def get_endpoint(self, endpoint_id=None, timeout=30):
'''Get endpoint ID and name from either ID or name input.'''
self.get_endpoints()
# print(self.endpoints)
if self._is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
return self.endpoint_id
def get_swarm_id(self, endpoint):
'''Get the swarm ID for a specific endpoint.'''
ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info"
stats = self._api_get(path)
return stats["Swarm"]["Cluster"]["ID"]
def get_stack(self, stack=None, endpoint_id=None, timeout=None):
self.get_stacks(endpoint_id)
if not self._is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
self.stack_id = []
if stack == "all":
for s in self.stacks:
# print(s)
if endpoint_id == s.get("EndpointId"):
self.stack_ids.append(s.get("Id"))
return self.stack_ids
else:
for s in self.stacks:
# print(s)
match_by_id = (
stack is not None
and s.get("Id") == stack
and endpoint_id == s.get("EndpointId")
)
match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int(
s.get("EndpointId")
) # Ensure types match for comparison
if match_by_id or match_by_name:
# if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId"))
# or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")):
self.stack_id = s.get("Id")
self.stack_name = s.get("Name")
self.stack_ids.append(s.get("Id"))
return s
RED = "\033[91m"
RESET = "\033[0m"
print(ValueError(f"{RED}{RESET} >> Stack not found: {stack}"))
return 1
def create_stack(
self,
endpoint,
stacks=None,
mode="git",
autostart=False,
stack_mode="swarm",
):
for stack in stacks:
if stack_mode == "swarm":
swarm_id = self.get_swarm_id(endpoint)
p = "swarm"
env_path = f"{self.repo_dir}/__swarm/{stack}/.env"
else:
p = "standalone"
env_path = f"{self.repo_dir}/{stack}/.env"
# input(swarm_id)
self.endpoint_id = self.get_endpoint_id()
if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir)
else:
print(f"Folder '{self.repo_dir}' does not exist.")
Repo.clone_from(self.git_url, self.repo_dir)
if mode == "git":
path = f"/stacks/create/{p}/repository"
# print(p)
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
elif self.endpoint_name == "nas":
stacks = self.nas_stacks
else:
stacks = [stack]
# print(json.dumps(self.stacks_all, indent=2))
# input(json.dumps(self.stacks_all, indent=2))
for stack in stacks:
if self.endpoint_id in self.stacks_all:
# Check if the stack exists by ID or name
stack_check = (
stack in self.stacks_all[self.endpoint_id]["by_id"]
or stack in self.stacks_all[self.endpoint_id]["by_name"]
)
if stack_check:
GREEN = "\033[92m"
RESET = "\033[0m"
print(f"{GREEN}{RESET} >> Stack {stack} already exist")
continue
print(f"Working on {stack} , stack mode: {stack_mode}")
envs = []
if os.path.exists(f"{env_path}"):
f = open(f"{env_path}", "r")
env_vars = f.read().splitlines()
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
# wl(envs)
for e in envs:
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e["value"] = "always"
if e["name"] in HWS:
# print("Found HW_MODE env var.")
if self.hw_mode:
e["value"] = "hw"
else:
e["value"] = "cpu"
if e["name"] == "LOGGING":
# print("Found LOGGING env var.")
if self.log_mode:
e["value"] = "journald"
else:
e["value"] = "syslog"
uid = uuid.uuid4()
# print(uid)
req = {
"Name": stack,
"Env": envs,
"AdditionalFiles": [],
"AutoUpdate": {
"forcePullImage": True,
"forceUpdate": False,
"webhook": f"{uid}",
},
"repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git",
"ReferenceName": "refs/heads/main",
"composeFile": f"{stack}/docker-compose.yml",
"ConfigFilePath": f"{stack}/docker-compose.yml",
"repositoryAuthentication": True,
"repositoryUsername": "jaydee",
"repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS",
"AuthorizationType": 0,
"TLSSkipVerify": False,
"supportRelativePath": True,
"repositoryAuthentication": True,
"fromAppTemplate": False,
"registries": [6, 3],
"FromAppTemplate": False,
"Namespace": "",
"CreatedByUserId": "",
"Webhook": "",
"filesystemPath": "/share/docker_data/portainer/portainer-data/",
"RegistryID": 4,
"isDetachedFromGit": True,
"method": "repository",
"swarmID": None,
}
if stack_mode == "swarm":
req["type"] = "swarm"
req["swarmID"] = swarm_id
req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml"
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
if self._debug:
print(json.dumps(req))
res = self._api_post(path, req)
if "Id" in res:
# print("Deploy request OK")
pass
else:
print(res)
tries = 0
created = False
while True:
try:
# print(self.endpoint_id)
# print(stack)
if self.get_stack(stack, self.endpoint_id) != 1:
created = True
break
except Exception as e:
print(
f"Waiting for stack {stack} to be created...{tries}/50",
end="\r",
)
time.sleep(10)
tries += 1
if tries > 50:
print(
f"Error retrieving stack {stack} after creation: {self.endpoint_name}"
)
break
logger.debug(f"Exception while getting stack {stack}: {e}")
if created:
if stack != "pihole":
# print(autostart)
if not autostart:
# self.get_stacks()
# self.stop_stack(stack,self.endpoint_id)
conts = self.get_containers()
# print(conts)
self.stop_containers(self.endpoint_name, conts)
if mode == "file":
print("Creating new stack from file...")
path = "/stacks/create/standalone/file"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
else:
stacks = [stack]
for stack in stacks:
print(f"Working on {stack}")
if os.path.exists(f"{self.repo_dir}/{stack}/.env"):
f = open(f"{self.repo_dir}/{stack}/.env", "r")
env_vars = f.read().splitlines()
envs = []
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
# wl(envs)
for e in envs:
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e["value"] = "always"
if e["name"] in HWS:
print("Found HW_MODE env var.")
if self.hw_mode:
e["value"] = "hw"
else:
e["value"] = "cpu"
if e["name"] == "LOGGING":
print("Found LOGGING env var.")
if self.log_mode:
e["value"] = "journald"
else:
e["value"] = "syslog"
file = {
# ("filename", file_object)
"file": (
"docker-compose.yml",
open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"),
),
}
self._api_post_file(path, self.endpoint_id, stack, envs, file)
def print_stacks(self, args):
"""Print a table of stacks, optionally filtered by endpoint."""
stacks = self.get_stacks()
count = 0
data = []
stack_names = []
for stack in stacks:
# print(stack)
if args.endpoint_id is not None:
if not stack["EndpointId"] in self.endpoints["by_id"]:
continue
if args.endpoint_id != "all":
if self.endpoints["by_name"][args.endpoint_id] != stack["EndpointId"]:
continue
try:
stack_names.append(stack["Name"])
data.append(
[
stack["Id"],
stack["Name"],
self.endpoints["by_id"][stack["EndpointId"]],
]
)
except KeyError as e:
data.append([stack["Id"], stack["Name"], "?"])
logger.debug(
"KeyError getting endpoint name for stack %s : %s", stack["Name"], e
)
count += 1
data = sorted(data, key=lambda x: x[1])
headers = ["StackID", "Name", "Endpoint"]
print(tabulate.tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}")
input("Continue...")
# print(sorted(stack_names))
def update_containers(self):
all_containers = self.all_data["containers"][self.args.endpoint_id]
#input(all_containers)
service_tuples = [(s[1], s[0]) for s in all_containers if "." not in s[0] and not s[0].startswith("runner-")]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" ]
else:
service_ids = [self.args.service_id]
if self.args.update is False:
if "__ONLY_CHECK__" in service_ids:
service_ids.remove("__ONLY_CHECK__")
pull = False
print("Checking for updates only...")
else:
pull = True
print("Checking for updates and pulling updates...")
else:
pull = True
print("Checking for updates and pulling updates...")
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
updates = []
for service_id in service_ids:
# print(self.all_data["containers"][self.args.endpoint_id])
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.get_endpoint_id()}/containers/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
#print(resp)
if resp == 500:
print("?")
elif resp['Status'] == "outdated":
if pull:
#print("Recreate")
self.recreate_container(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
updates.append(service_dict[service_id])
print(ok, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
updates.append(service_dict[service_id])
print(err, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
else:
print(ok, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
if len(updates) > 0:
if pull:
self.gotify_message(f"Services updated: {', '.join(updates)}")
else:
self.gotify_message(f"Services updates available: {', '.join(updates)}")
print("\033[?25h", end="")
return True
def update_service(self):
all_services = self.get_services(self.get_endpoint_id())
#input(all_services)
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
if "__ONLY_CHECK__" in service_ids:
self.args.update = False
else:
self.args.update = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if self.args.update:
pull = True
print("Checking for updates and pulling updates...")
else:
pull = False
print("Checking for updates only...")
longest = 0
for a in service_dict.items():
if a[0] == "__ONLY_CHECK__":
continue
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
if resp['Status'] == "outdated":
if pull:
self.restart_srv(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
self.gotify_message(f"Service {service_dict[service_id]} updated")
print(f"{ok} updated")
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err)
else:
print(ok)
print("\033[?25h", end="")
return True
def update_service2(self):
all_services = self.get_services(self.get_endpoint_id(self.args.endpoint_id))
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
pull = False
print("Checking for updates only...")
else:
print("Checking for updates and pulling updates...")
pull = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
if resp['Status'] == "outdated":
if pull:
self.restart_srv(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
self.gotify_message(f"Service {service_dict[service_id]} updated")
print(ok)
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err)
else:
print(ok)
print("\033[?25h", end="")
return True
def recreate_container(self,service_id, pull=False):
"""Restart a service on an endpoint."""
path = f"/docker/{self.endpoint_id}/containers/{service_id}/recreate"
# print(path)
params={"pullImage": pull}
try:
resp = self._api_post(path, json=params, timeout=20)
#print(resp)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
def restart_srv(self,service_id, pool=False):
"""Restart a service on an endpoint."""
path = f"/endpoints/{self.endpoint_id}/forceupdateservice"
params={"serviceID": service_id, "pullImage": pool}
try:
resp = self._api_put(path, json=params, timeout=20)
# print(resp)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
def restart_service(self, endpoint_id, service_id):
stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
stacks = sorted(stacks, key=lambda x: x[1])
stack_id = radiolist_dialog(
title="Select one service",
text="Choose a service:",
values=stacks
).run()
service_dict = dict(stacks)
services = self.get_services(self.endpoint_name, stack_id)
svc_name = service_dict.get(stack_id)
stack_svcs = []
svc_menu = []
for s in services:
try:
if svc_name in s['Spec']['Name']:
stack_svcs.append([s['Version']['Index'], s['Spec']['Name']])
svc_menu.append([s['ID'], s['Spec']['Name']])
except KeyError as e:
print(e)
service_id = radiolist_dialog(
title="Select one service",
text="Choose a service:",
values=svc_menu
).run()
self.restart_srv(service_id, False)
print(f"Service {service_id} : restarted")
return True
def start_stack(self, stack=None, endpoint_id=None):
"""Start one stack or all stacks on an endpoint."""
if endpoint_id is not None:
print("Getting endpoint")
self.get_endpoint(endpoint_id)
if stack is not None:
for s in stack:
self.stack_ids = [self._resolve_stack_id(s, endpoint_id)]
for stck in self.stack_ids:
path = f"/stacks/{stck}/start"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self._api_post_no_body(path, timeout=20)
except ValueError as e:
print(f"Error stoping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
)
return True
def stop_stack(self, stack, endpoint_id):
"""Stop one stack or all stacks on an endpoint."""
print(f"Stopping stack {stack}")
if endpoint_id is not None:
self.get_endpoint(endpoint_id)
if stack is not None:
for s in stack:
self.stack_ids = [self._resolve_stack_id(s, endpoint_id)]
# print(self.stack_ids)
for stck in self.stack_ids:
path = f"/stacks/{stck}/stop"
# print(path)
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self._api_post_no_body(path, timeout=120)
except NameError as e:
print(f"Error stopping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
)
return True
def _resolve_endpoint(self, endpoint_id):
self.get_endpoints()
if self._debug:
print(endpoint_id)
print(self.endpoints)
if self._is_number(endpoint_id):
self.endpoint_id = int(endpoint_id)
self.endpoint_name = self.endpoints["by_id"][self.endpoint_id]
else:
self.endpoint_name = endpoint_id
self.endpoint_id = int(self.endpoints["by_name"][endpoint_id])
def _resolve_stack_id(self, stack, endpoint_id):
if stack == "all":
return "all"
if not self._is_number(stack):
result = self.get_stack(stack, endpoint_id)
return result["Id"]
return int(stack)
def _delete_all_stacks(self, endpoint_id):
stacks = self.get_stacks(endpoint_id)
paths = []
for s in stacks:
if int(s["EndpointId"]) != int(endpoint_id):
continue
path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true"
paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
def delete_item(item):
print(f"Delete stack {item[1]} from {item[0]}")
out = self._api_delete(item[2])
logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(delete_item, paths)
return "Done"
def _delete_single_stack(self, stack_id, endpoint_id):
path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true"
# print(path)
try:
out = self._api_delete(path,timeout=240)
except ValueError as e:
msg = str(e)
if "Conflict for url" in msg:
print("Stack with this name may already exist.")
else:
print(f"Error deleting stack: {e}")
return []
return out or []
def delete_stack(self, endpoint_id=None, stack=None):
"""Delete one stack or all stacks on an endpoint."""
self._resolve_endpoint(endpoint_id)
endpoint_id = self.endpoint_id
if stack == "all":
return self._delete_all_stacks(endpoint_id)
else:
for s in stack:
print(f" >> Deleting stack {s} from endpoint {self.endpoint_name}")
stack_id = self._resolve_stack_id(s, endpoint_id)
self._delete_single_stack(stack_id, endpoint_id)
return "Done"
# def delete_stack(self, endpoint_id=None, stack=None):
# """
# Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
# """
# self.get_endpoints()
# if self._is_number(endpoint_id):
# self.endpoint_name = self.endpoints["by_id"][endpoint_id]
# self.endpoint_id = endpoint_id
# else:
# self.endpoint_name = endpoint_id
# self.endpoint_id = self.endpoints["by_name"][endpoint_id]
# if not self._is_number(endpoint_id):
# endpoint_id = int(self.endpoints["by_name"][endpoint_id])
# if not self._is_number(stack) and stack != "all":
# # print(stack)
# # print(self.endpoint_id)
# stack = self.get_stack(stack, self.endpoint_id)["Id"]
# if stack == "all":
# stacks = self.get_stacks(self.endpoint_id)
# paths = []
# for s in stacks:
# # print(f"Delete stack {s['Name']}")
# # print(s['EndpointId'], endpoint_id)
# if int(s["EndpointId"]) != int(endpoint_id):
# continue
# # print("Deleting stack:", s['Name'])
# path = f"/stacks/{s['Id']}"
# if endpoint_id is not None:
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
# paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
# # input(paths)
# def delete(c):
# print(f"Delete stack {c[1]} from {c[0]} ")
# out = self._api_delete(c[2])
# logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}")
# with ThreadPoolExecutor(max_workers=10) as exe:
# exe.map(delete, paths)
# return "Done"
# else:
# path = f"/stacks/{stack}"
# if endpoint_id is not None:
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
# # print(path)
# try:
# # print(path)
# # print(base_url)
# # print(token)
# stacks = self._api_delete(path)
# except Exception as e:
# # print(f"Error creating stack: {e}")
# if "Conflict for url" in str(e):
# print("Stack with this name may already exist.")
# else:
# print(f"Error deleting stack: {e}")
# # print(stacks)
# return []
# if stacks is None:
# return []
# return stacks
def create_secret(self, name, value, endpoint_id=None, timeout=None):
"""Create a Docker secret on the specified endpoint."""
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
path = f"/endpoints/{endpoint_id}/docker/secrets/create"
encoded = base64.b64encode(value.encode()).decode()
data = {"Name": name, "Data": encoded}
return self._api_post(path, data, timeout=timeout)
VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://192.168.77.101:8200")
try:
VAULT_TOKEN = os.environ.get("VAULT_TOKEN")
if VAULT_TOKEN is None:
raise KeyError
except KeyError:
VAULT_TOKEN = prompt("Valult root token : ", is_password=True)
os.environ["VAULT_TOKEN"] = VAULT_TOKEN
client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN)
# Check if connected
if client.is_authenticated():
print("Connected to Vault")
else:
raise Exception("Failed to authenticate with Vault")
# Specify the mount point of your KV engine
VERSION = "0.1.50"
defaults = {
"endpoint_id": "vm01",
"stack": "my_stack",
"deploy_mode": "git",
"autostart": "True",
"stack_mode": "swarm",
"site": "portainer",
}
cur_config = {}
def load_config(defaults=defaults):
'''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.'''
if os.path.exists("/myapps/portainer.conf"):
with open("/myapps/portainer.conf", "r") as f:
conf_data = f.read()
for line in conf_data.split("\n"):
if line.startswith("#") or line.strip() == "":
continue
key, value = line.split("=", 1)
os.environ[key.strip()] = value.strip()
cur_config[key.strip()] = value.strip()
else:
print("No /myapps/portainer.conf file found, proceeding with env vars.")
os.makedirs("/myapps", exist_ok=True)
for field in defaults.keys():
value_in = os.getenv(f"PORTAINER_{field.upper()}")
if value_in is not None:
os.environ[f"PORTAINER_{field.upper()}"] = value_in
cur_config[f"PORTAINER_{field.upper()}"] = value_in
else:
os.environ[f"PORTAINER_{field.upper()}"] = defaults[field]
cur_config[f"PORTAINER_{field.upper()}"] = defaults[field]
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
return cur_config
a = load_config(defaults)
# ENV_VARS = [
# "PORTAINER_URL",
# "PORTAINER_SITE",
# "PORTAINER_ENDPOINT_ID",
# "PORTAINER_STACK",
# "PORTAINER_DEPLOY_MODE",
# "PORTAINER_STACK_MODE",
# ]
def update_configs(cur_config):
'''Update defaults from environment variables if set.'''
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
# print(conf_data)
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
parser = argparse.ArgumentParser(
description=f"""\
Portainer helper - use env vars or pass credentials."
version: {VERSION}
""",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--base",
"-b",
default=os.getenv("PORTAINER_URL", "https://portainer.example.com"),
help="Base URL for Portainer (ENV: PORTAINER_URL)",
)
parser.add_argument("--site", "-t", type=str, default=None, help="Site")
parser.add_argument(
"--endpoint-id",
"-e",
type=str,
default=None,
help="Endpoint ID to limit stack operations",
)
parser.add_argument(
"--service-id",
"-i",
type=str,
default=None,
help="Service ID to limit service operations",
)
parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations")
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
parser.add_argument(
"--autostart", "-Z", action="store_true", help="Auto-start created stacks"
)
parser.add_argument("--update", "-u", action="store_true", help="Update service if it exists")
parser.add_argument("--debug", "-D", action="store_true")
parser.add_argument("--launcher", "-L", action="store_true")
parser.add_argument("--gpu", "-g", action="store_true")
parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds")
parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode")
args = parser.parse_args()
print("Running version:", VERSION)
print("Environment:", args.site)
args.client = client
if args.site is not None:
cur_config["PORTAINER_SITE"] = args.site
if args.endpoint_id is not None:
cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id
if args.stack is not None:
cur_config["PORTAINER_STACK"] = args.stack
if args.deploy_mode is not None:
cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode
if args.stack_mode is not None:
cur_config["PORTAINER_STACK_MODE"] = args.stack_mode
update_configs(cur_config)
if args.debug:
input(cur_config)
_LOG_LEVEL = "DEBUG"
LOG_FILE = "/tmp/portainer.log"
if _LOG_LEVEL == "DEBUG":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.debug("using debug logging")
elif _LOG_LEVEL == "ERROR":
logging.basicConfig(
filename=LOG_FILE,
level=logging.ERROR,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using error logging")
elif _LOG_LEVEL == "SCAN":
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("using scan logging")
else:
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s : %(levelname)s : %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logging.info("script started")
logger = logging.getLogger(__name__)
def wl(msg):
"""Write log message if debug is enabled."""
if args.debug:
print(msg)
def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None):
"""
fields = [("arg_name", "Prompt text")]
"""
longest = 0
for field, text in fields:
a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")"
if len(a) > longest:
longest = len(a)
for field, text in fields:
# print(field)
value_in = getattr(args_in, field)
default = defaults_in.get(f"PORTAINER_{field}".upper())
cur_site = defaults_in.get("PORTAINER_SITE".upper())
cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper())
# print(value_in)
if value_in is None:
if default is not None:
prompt_text = f"{text} (default={default}) : "
# value_in = input(prompt) or default
if field == "site":
commands = ["portainer", "port"]
elif field == "deploy_mode":
commands = ["git", "upload"]
elif field == "stack_mode":
commands = ["swarm", "compose"]
elif field == "endpoint_id":
commands = por.endpoints_names
elif field == "stack":
if args.action == "create_stack":
# input(json.dumps(stacks, indent=2))
commands = [
'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana',
'hashicorp', 'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3',
'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx',
'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry',
'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress',
'wud', 'zabbix-server']
try:
print(por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys())
for s in por.all_data['stacks'][defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]]['by_name'].keys():
#print(s)
commands.remove(s)
except KeyError:
print("No stacks found for endpoint", defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
else:
commands = []
if por._debug:
input(por.stacks_all)
# print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
try:
for s in por.stacks_all[
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]
]["by_name"].keys():
commands.append(s)
except KeyError:
print(
"No stacks found for endpoint",
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()],
)
sys.exit(1)
else:
commands = []
completer = WordCompleter(
commands, ignore_case=True, match_middle=False
)
try:
if field == "stack":
commands.sort()
commands_tuples = [(cmd, cmd) for cmd in commands]
commands_tuples.insert(0, ("__ALL__", "[Select ALL]"))
value_in = checkboxlist_dialog(
title="Select Services",
text="Choose one or more services:",
values=commands_tuples,
).run()
if value_in is None:
print("Cancelled.")
sys.exit(0)
elif "__ALL__" in value_in:
# User selected "Select ALL"
value_in = commands # all real commands
value_in.sort()
if "pihole" in value_in:
if action == "delete_stack":
value_in.remove("pihole")
value_in.append("pihole")
else:
value_in.remove("pihole")
value_in.insert(0, "pihole")
print(" >> Stacks :", ",".join(value_in))
else:
value_in = (
prompt(
f" >> {prompt_text}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(prompt_text, default, longest+2)
else:
# value_in = input(f"{text}: ")
commands = ["start", "stop", "status", "restart", "reload", "exit"]
completer = WordCompleter(commands, ignore_case=True)
try:
value_in = (
prompt(
f" >> {text} {default}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(text, default, longest+2)
value_in.sort()
if por._debug:
print("Value entered:", value_in)
defaults_in[f"PORTAINER_{field}".upper()] = value_in
setattr(args, field, value_in)
if field == "site" and value_in != cur_site:
por.get_site(value_in)
if value_in == "portainer":
defaults_in["PORTAINER_ENDPOINT_ID"] = "m-s"
elif value_in == "port":
defaults_in["PORTAINER_ENDPOINT_ID"] = "vm01"
if field == "stack" and value_in != cur_site:
os.environ[field] = ",".join(value_in)
else:
os.environ[field] = value_in
if por._debug:
print(f"{defaults_in} {field} {value_in}")
if field == "endpoint_id" and value_in != defaults_in.get(
"PORTAINER_ENDPOINT_ID".upper()
):
print("refreshing environment")
por.get_endpoints()
with open("/myapps/portainer.conf", "w") as f:
for k in defaults_in.keys():
f.write(f"{k}={defaults_in[k]}\n")
return args
if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
def signal_handler(sig, frame):
logger.warning("Killed manually %s, %s", sig, frame)
print("\nTerminated by user")
print("\033[?25h", end="")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system("cls" if os.name == "nt" else "clear")
if args.action is None:
actions = [
("create_stack","create_stack"),
("delete_stack","delete_stack"),
("stop_stack","stop_stack"),
("start_stack","start_stack"),
("restart_service","restart_service"),
("update_service","update_service"),
("update_containers","update_containers"),
("list_stacks","list_stacks"),
("update_stack","update_stack"),
("secrets","secrets"),
("print_all_data","print_all_data"),
("list_endpoints","list_endpoints"),
("list_containers","list_containers"),
("stop_containers","stop_containers"),
("start_containers","start_containers"),
("refresh_environment","refresh_environment"),
("refresh_status","refresh_status"),
("update_status","update_status"),
]
selected_action = radiolist_dialog(
title="Select one service",
text="Choose a service:",
values=actions
).run()
print("Selected:", selected_action)
# print("Possible actions: \n")
# i = 1
# for a in actions:
# print(f" > {i:>2}. {a}")
# i += 1
# ans = input("\nSelect action to perform: ")
args.action = selected_action
os.system("cls" if os.name == "nt" else "clear")
# Example: list endpoints
por = PortainerApi(cur_config["PORTAINER_SITE"], args)
por.set_defaults(cur_config)
if args.debug:
por._debug = True
if args.action == "secrets":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
secrets = {
"gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb",
"influxdb2-admin-token": "l4c1j4yd33Du5lo",
"ha_influxdb2_admin_token": "l4c1j4yd33Du5lo",
"wordpress_db_password": "wordpress",
"wordpress_root_db_password": "wordpress",
}
for key, value in secrets.items():
res = por.create_secret(key, value, args.endpoint_id, args.timeout)
print(res)
sys.exit()
if args.action == "delete_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
action="delete_stack",
)
input(
f"\nDelete stack {','.join(args.stack)} on endpoint {args.endpoint_id}. Press ENTER to continue..."
)
por.delete_stack(
args.endpoint_id,
args.stack,
)
sys.exit()
if args.action == "create_stack":
por.action = "create_stack"
#print(cur_config)
#print(args)
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
("stack_mode", "Stack mode (swarm or compose)"),
("deploy_mode", "Deploy mode (git or upload)"),
],
por,
)
por.create_stack(
args.endpoint_id,
args.stack,
args.deploy_mode,
args.autostart,
args.stack_mode,
)
sys.exit()
if args.action == "stop_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.stop_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "start_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.start_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "restart_service":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.restart_service(args.endpoint_id, "lala")
sys.exit()
if args.action == "update_service":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_service()
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "update_containers":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_containers()
sys.exit()
if args.action == "list_stacks":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
por.print_stacks(args)
if args.launcher:
input("Press ENTER to continue...")
# print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "list_containers":
print("Getting containers")
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
print("\n".join(por.get_containers()))
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "update_stack":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_stack(args)
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "print_all_data":
print(json.dumps(por.all_data, indent=2))
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "update_status":
por.update_status(args.endpoint_id, args.stack)
sys.exit()
if args.action == "list_endpoints":
eps = por.get_endpoints(args)
export_data = []
for i in eps["by_id"]:
export_data.append([i, eps["by_id"][i]])
headers = ["EndpointId", "Name"]
print(tabulate(export_data, headers=headers, tablefmt="github"))
if args.launcher:
input("\nPress ENTER to continue...")
sys.exit()
if args.action == "stop_containers":
# TODO: does not work
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
if por.all_data["endpoints_status"][args.endpoint_id] != 1:
print(f"Endpoint {por.get_endpoint_name(args.endpoint_id)} is offline")
sys.exit()
print(f"Stopping containers on {por.get_endpoint_name(args.endpoint_id)}")
cont = []
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.stop_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data, indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "start_containers":
print("Starting containers")
cont = []
# input(json.dumps(por.all_data,indent=2))
for c in por.all_data["containers"][args.endpoint_id]:
if args.stack in (c, "all"):
cont += por.all_data["containers"][args.endpoint_id][c]
por.start_containers(args.endpoint_id, cont)
sys.exit()
if args.action == "refresh_environment":
cont = por.refresh()
sys.exit()
if args.action == "refresh_status":
if args.stack == "all":
print("Stopping all stacks...")
stcks = por.get_stacks(endpoint_id=args.endpoint_id)
else:
por.refresh_status(args.stack_id)