Files
portainer/port.py
2025-12-13 13:33:02 +01:00

1097 lines
44 KiB
Python

'''Portainer API wrapper module.'''
import os
from concurrent.futures import ThreadPoolExecutor
import json
import sys
import uuid
import shutil
import time
import logging
import base64
import tabulate
from git import Repo
import requests
import hvac
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.shortcuts import checkboxlist_dialog
from prompt_toolkit.shortcuts import radiolist_dialog
logger = logging.getLogger(__name__)
class Portainer:
"""
Simple wrapper around the module-level Portainer helper functions.
Instantiate with base_url and optional token/timeout and call methods
to perform API operations.
"""
def __init__(self, site, args=None, timeout=120):
self.base_url = None
self.token = None
self.args = args
self.action = None
self._debug = False
self.timeout = timeout
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.stack_name = None
self.stacks_all = {}
self.stack_id = None
self.stack_ids = []
self.endpoint_name = None
self.endpoint_id = args.endpoint_id
# self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git"
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.repo_dir = "/tmp/docker-compose"
self.basic_stacks = [
"pihole",
"nginx",
"mosquitto",
"webhub",
"authentik",
"bitwarden",
"mailu3",
"home-assistant",
"homepage",
]
self.nas_stacks = self.basic_stacks + [
"gitlab",
"bookstack",
"dockermon",
"gitea",
"grafana",
"immich",
"jupyter",
"kestra",
"mealie",
]
self.m_server_stacks = self.basic_stacks + [
"immich",
"zabbix-server",
"gitea",
"unifibrowser",
"mediacenter",
"watchtower",
"wazuh",
"octoprint",
"motioneye",
"kestra",
"bookstack",
"wud",
"uptime-kuma",
"registry",
"regsync",
"dockermon",
"grafana",
"nextcloud",
"semaphore",
"node-red",
"test",
"jupyter",
"paperless",
"mealie",
"n8n",
"ollama",
"rancher",
]
self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"]
self.rack_stacks = self.basic_stacks + [
"gitlab",
"bookstack",
"dockermon",
"gitea",
"grafana",
"immich",
"jupyter",
"kestra",
"mealie",
]
self.log_mode = False
self.hw_mode = False
self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}, "services":{}}
self.get_site(site)
self.get_endpoints()
self.get_stacks()
self.get_containers()
def set_defaults(self, config):
'''Set default configuration from provided config dictionary.'''
self.cur_config = config
def get_site(self, site):
if site == "portainer":
self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
)
# self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
token_path = "portainer/token"
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
elif site == "port":
self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api")
token_path = "port/token"
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
else:
self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
)
self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
self.get_endpoints()
self.get_stacks()
def _is_number(self, s):
"""Check if the input string is a number."""
try:
float(s)
return True
except ValueError:
return False
def gotify_message(self, message):
payload = {
"title": "Updates in Portainer",
"message": message,
"priority": 5
}
'''Send a notification message via Gotify.'''
response = requests.post(
"https://gotify.sectorq.eu/message",
data=payload,
headers={"X-Gotify-Key": "ASn_fIAd5OVjm8c"}
)
# print("Status:", response.status_code)
# print("Response:", response.text)
pass
def _api_get(self, path, timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
def _api_post(self, path, json="", timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
# print(url)
# print(json)
resp = requests.post(url, headers=headers, json=json, timeout=timeout)
return resp.text
def _api_put(self, path, json="", timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
# print(url)
# print(json)
resp = requests.put(url, headers=headers, json=json, timeout=timeout)
return resp.text
def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
# input("API POST2 called. Press Enter to continue.")
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)}
# print(data)
resp = requests.post(
url, headers=headers, files=file, data=data, timeout=timeout
)
resp.raise_for_status()
return resp.json()
def _api_post_no_body(self, path, timeout=120):
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
# print(url)
headers = {"X-API-Key": f"{self.token}"}
resp = requests.post(url, headers=headers, timeout=timeout)
return resp.text
def _api_delete(self, path, timeout=120):
"""Example authenticated DELETE request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.delete(url, headers=headers, timeout=timeout)
# print(resp)
resp.raise_for_status()
# print(resp.status_code)
return resp.status_code
def refresh(self):
'''Refresh all data from Portainer.'''
self.get_endpoints()
self.get_stacks(self)
self.get_containers(self)
return True
def get_stacks(self, endpoint_id="all", timeout=20):
'''Get a list of stacks for a specific endpoint or all endpoints.'''
if endpoint_id != "all":
endpoint_id = self.get_endpoint_id(endpoint_id)
path = "/stacks"
stcks = []
stacks = self._api_get(path, timeout=timeout)
self.stacks_all = {}
fail_endponts = [20, 39, 41]
# print(json.dumps(stacks,indent=2))
webhooks = {}
for s in stacks:
# print(type(s["AutoUpdate"]) )
# input(s)
if s["EndpointId"] in fail_endponts:
continue
if not s["EndpointId"] in webhooks:
try:
webhooks[s["EndpointId"]] = {"webhook": {}}
webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}}
except Exception as e:
logger.debug(
f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}"
)
if not s["EndpointId"] in self.stacks_all:
self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}}
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = {
"by_id": {},
"by_name": {},
}
self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"]
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][
s["Id"]
] = s["Name"]
self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"]
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][
s["Name"]
] = s["Id"]
# print(s)
if "AutoUpdate" in s and s["AutoUpdate"] is not None:
if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]:
# print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
# print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"]
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"AutoUpdate"
]["Webhook"]
elif s["AutoUpdate"]["Webhook"] != "":
webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"]
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"Webhook"
]
# print(self.stacks_all)
if s["EndpointId"] == endpoint_id or endpoint_id == "all":
stcks.append(s)
# print(stcks)
if stcks is None:
return []
self.stacks = stacks
self.all_data["stacks"] = self.stacks_all
self.all_data["webhooks"] = webhooks
# input(json.dumps(self.stacks_all,indent=2))
return stcks
def get_services(self, endpoint, timeout=30):
'''Get a list of services for a specific stack on an endpoint.'''
# print(json.dumps(self.all_data,indent=2))
path = f"/endpoints/{self.get_endpoint_id(endpoint)}/docker/services"
# print(path)
# path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}'
services = self._api_get(path, timeout=timeout)
return services
def update_status(self, endpoint, stack):
'''Get the update status of a specific stack on an endpoint.'''
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
# input(path)
stats = self._api_get(path)
print(stats)
def get_endpoint_id(self, endpoint):
'''Get endpoint ID from either ID or name input.'''
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return endpoint
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return self.endpoints["by_name"][endpoint]
def get_endpoint_name(self, endpoint):
'''Get endpoint name from either ID or name input.'''
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return self.endpoints["by_id"][endpoint]
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return endpoint
def get_containers(self, endpoint="all", stack="all", timeout=30):
'''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
if endpoint == "all":
for s in self.all_data["endpoints"]["by_id"]:
# print(s)
if stack == "all":
if s not in self.all_data["stacks"]:
continue
if self.all_data["endpoints_status"][s] != 1:
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
for e in self.all_data["stacks"][s]["by_name"]:
path = (
f"/endpoints/{s}/docker/containers/json"
f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
)
logging.info(f"request : {path}")
try:
containers = self._api_get(path)
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
cont.append(c["Names"][0].replace("/", ""))
contr.append(c["Names"][0].replace("/", ""))
if self.all_data["endpoints"]["by_id"][s] in data:
data[self.all_data["endpoints"]["by_id"][s]][e] = contr
else:
data[self.all_data["endpoints"]["by_id"][s]] = {
e: contr
}
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}",
)
# print(data)
self.all_data["containers"] = data
else:
self.get_containers()
for i in self.all_data["containers"][endpoint][stack]:
cont.append(i)
return cont
def stop_containers(self, endpoint, containers, timeout=130):
'''Stop containers on an endpoint.'''
if self.all_data["endpoints_status"][endpoint] != 1:
print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline")
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Stopping {c}")
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop")
# print(f"✔")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
# for c in containers:
# print(f" > Stopping {c}")
# self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
# return 0
def start_containers(self, endpoint, containers, timeout=130):
'''Start containers on an endpoint.'''
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Starting {c}")
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
def update_stack(self, endpoint, stack, autostart, timeout=130):
'''Update one stack or all stacks on an endpoint.'''
stcs = []
if stack == "all":
for s in self.all_data["webhooks"][endpoint]:
stcs.append([s, self.all_data["webhooks"][endpoint][s]])
else:
try:
stcs.append([stack, self.all_data["webhooks"][endpoint][stack]])
except Exception as e:
print(f"Error: Stack {stack} not found on endpoint {endpoint}: {e}")
# input(stcs)
def update(c):
print(f" > Updating {c[0]} on {endpoint}")
ans = self._api_post_no_body(f"/stacks/webhooks/{c[1]}")
logger.debug(
f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}"
)
def stop():
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack == "all":
cont += self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint, cont)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(update, stcs)
if not autostart:
time.sleep(120)
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack == "all":
cont += self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint, cont)
def get_endpoints(self, timeout=10):
'''Get a list of all endpoints.'''
endpoints = self._api_get("/endpoints")
eps = {"by_id": {}, "by_name": {}}
eps_stats = {}
for ep in endpoints:
eps["by_id"][ep["Id"]] = ep["Name"]
eps["by_name"][ep["Name"]] = ep["Id"]
eps_stats[ep["Id"]] = ep["Status"]
eps_stats[ep["Name"]] = ep["Status"]
self.endpoints = eps
self.endpoints_names = list(eps["by_name"])
self.all_data["endpoints"] = eps
self.all_data["endpoints_status"] = eps_stats
# input(eps_stats)
# input(eps)
return eps
def get_endpoint(self, endpoint_id=None, timeout=30):
'''Get endpoint ID and name from either ID or name input.'''
self.get_endpoints()
# print(self.endpoints)
if self._is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
return self.endpoint_id
def get_swarm_id(self, endpoint):
'''Get the swarm ID for a specific endpoint.'''
ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info"
stats = self._api_get(path)
return stats["Swarm"]["Cluster"]["ID"]
def get_stack(self, stack=None, endpoint_id=None, timeout=None):
self.get_stacks(endpoint_id)
if not self._is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
self.stack_id = []
if stack == "all":
for s in self.stacks:
# print(s)
if endpoint_id == s.get("EndpointId"):
self.stack_ids.append(s.get("Id"))
return self.stack_ids
else:
for s in self.stacks:
# print(s)
match_by_id = (
stack is not None
and s.get("Id") == stack
and endpoint_id == s.get("EndpointId")
)
match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int(
s.get("EndpointId")
) # Ensure types match for comparison
if match_by_id or match_by_name:
# if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId"))
# or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")):
self.stack_id = s.get("Id")
self.stack_name = s.get("Name")
self.stack_ids.append(s.get("Id"))
return s
RED = "\033[91m"
RESET = "\033[0m"
print(ValueError(f"{RED}{RESET} >> Stack not found: {stack}"))
return 1
def create_stack(
self,
endpoint,
stacks=None,
mode="git",
autostart=False,
stack_mode="swarm",
):
for stack in stacks:
if stack_mode == "swarm":
swarm_id = self.get_swarm_id(endpoint)
p = "swarm"
env_path = f"{self.repo_dir}/__swarm/{stack}/.env"
else:
p = "standalone"
env_path = f"{self.repo_dir}/{stack}/.env"
# input(swarm_id)
self.endpoint_id = self.get_endpoint_id(endpoint)
if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir)
else:
print(f"Folder '{self.repo_dir}' does not exist.")
Repo.clone_from(self.git_url, self.repo_dir)
if mode == "git":
path = f"/stacks/create/{p}/repository"
# print(p)
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
elif self.endpoint_name == "nas":
stacks = self.nas_stacks
else:
stacks = [stack]
# print(json.dumps(self.stacks_all, indent=2))
# input(json.dumps(self.stacks_all, indent=2))
for stack in stacks:
if self.endpoint_id in self.stacks_all:
# Check if the stack exists by ID or name
stack_check = (
stack in self.stacks_all[self.endpoint_id]["by_id"]
or stack in self.stacks_all[self.endpoint_id]["by_name"]
)
if stack_check:
GREEN = "\033[92m"
RESET = "\033[0m"
print(f"{GREEN}{RESET} >> Stack {stack} already exist")
continue
print(f"Working on {stack} , stack mode: {stack_mode}")
envs = []
if os.path.exists(f"{env_path}"):
f = open(f"{env_path}", "r")
env_vars = f.read().splitlines()
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
# wl(envs)
for e in envs:
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e["value"] = "always"
if e["name"] in HWS:
# print("Found HW_MODE env var.")
if self.hw_mode:
e["value"] = "hw"
else:
e["value"] = "cpu"
if e["name"] == "LOGGING":
# print("Found LOGGING env var.")
if self.log_mode:
e["value"] = "journald"
else:
e["value"] = "syslog"
uid = uuid.uuid4()
# print(uid)
req = {
"Name": stack,
"Env": envs,
"AdditionalFiles": [],
"AutoUpdate": {
"forcePullImage": True,
"forceUpdate": False,
"webhook": f"{uid}",
},
"repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git",
"ReferenceName": "refs/heads/main",
"composeFile": f"{stack}/docker-compose.yml",
"ConfigFilePath": f"{stack}/docker-compose.yml",
"repositoryAuthentication": True,
"repositoryUsername": "jaydee",
"repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS",
"AuthorizationType": 0,
"TLSSkipVerify": False,
"supportRelativePath": True,
"repositoryAuthentication": True,
"fromAppTemplate": False,
"registries": [6, 3],
"FromAppTemplate": False,
"Namespace": "",
"CreatedByUserId": "",
"Webhook": "",
"filesystemPath": "/share/docker_data/portainer/portainer-data/",
"RegistryID": 4,
"isDetachedFromGit": True,
"method": "repository",
"swarmID": None,
}
if stack_mode == "swarm":
req["type"] = "swarm"
req["swarmID"] = swarm_id
req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml"
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
if self._debug:
print(json.dumps(req))
res = self._api_post(path, req)
if "Id" in res:
# print("Deploy request OK")
pass
else:
print(res)
tries = 0
created = False
while True:
try:
# print(self.endpoint_id)
# print(stack)
if self.get_stack(stack, self.endpoint_id) != 1:
created = True
break
except Exception as e:
print(
f"Waiting for stack {stack} to be created...{tries}/50",
end="\r",
)
time.sleep(10)
tries += 1
if tries > 50:
print(
f"Error retrieving stack {stack} after creation: {self.endpoint_name}"
)
break
logger.debug(f"Exception while getting stack {stack}: {e}")
if created:
if stack != "pihole":
# print(autostart)
if not autostart:
# self.get_stacks()
# self.stop_stack(stack,self.endpoint_id)
conts = self.get_containers(self.endpoint_name, stack)
# print(conts)
self.stop_containers(self.endpoint_name, conts)
if mode == "file":
print("Creating new stack from file...")
path = "/stacks/create/standalone/file"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
else:
stacks = [stack]
for stack in stacks:
print(f"Working on {stack}")
if os.path.exists(f"{self.repo_dir}/{stack}/.env"):
f = open(f"{self.repo_dir}/{stack}/.env", "r")
env_vars = f.read().splitlines()
envs = []
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
# wl(envs)
for e in envs:
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e["value"] = "always"
if e["name"] in HWS:
print("Found HW_MODE env var.")
if self.hw_mode:
e["value"] = "hw"
else:
e["value"] = "cpu"
if e["name"] == "LOGGING":
print("Found LOGGING env var.")
if self.log_mode:
e["value"] = "journald"
else:
e["value"] = "syslog"
file = {
# ("filename", file_object)
"file": (
"docker-compose.yml",
open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"),
),
}
self._api_post_file(path, self.endpoint_id, stack, envs, file)
def print_stacks(self, endpoint="all"):
"""Print a table of stacks, optionally filtered by endpoint."""
stacks = self.get_stacks()
count = 0
data = []
stack_names = []
for stack in stacks:
if endpoint is not None:
if not stack["EndpointId"] in self.endpoints["by_id"]:
continue
if endpoint != "all":
if self.endpoints["by_name"][endpoint] != stack["EndpointId"]:
continue
try:
stack_names.append(stack["Name"])
data.append(
[
stack["Id"],
stack["Name"],
self.endpoints["by_id"][stack["EndpointId"]],
]
)
except KeyError as e:
data.append([stack["Id"], stack["Name"], "?"])
logger.debug(
"KeyError getting endpoint name for stack %s : %s", stack["Name"], e
)
count += 1
headers = ["StackID", "Name", "Endpoint"]
print(tabulate.tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}")
# print(sorted(stack_names))
def update_service(self):
all_services = self.get_services(self.get_endpoint_id(self.args.endpoint_id))
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
pull = False
print("Checking for updates only...")
else:
print("Checking for updates and pulling updates...")
pull = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
if resp['Status'] == "outdated":
if pull:
self.restart_srv(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
self.gotify_message(f"Service {service_dict[service_id]} updated")
print(ok)
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err)
else:
print(ok)
print("\033[?25h", end="")
return True
def restart_srv(self,service_id, pool=False):
"""Restart a service on an endpoint."""
path = f"/endpoints/{self.endpoint_id}/forceupdateservice"
params={"serviceID": service_id, "pullImage": pool}
try:
resp = self._api_put(path, json=params, timeout=20)
print(resp)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
def restart_service(self, endpoint_id, service_id):
stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
stack_id = radiolist_dialog(
title="Select one service",
text="Choose a service:",
values=stacks
).run()
service_dict = dict(stacks)
services = self.get_services(self.endpoint_name, stack_id)
svc_name = service_dict.get(stack_id)
stack_svcs = []
svc_menu = []
for s in services:
try:
if svc_name in s['Spec']['Name']:
stack_svcs.append([s['Version']['Index'], s['Spec']['Name']])
svc_menu.append([s['ID'], s['Spec']['Name']])
except KeyError as e:
print(e)
service_id = radiolist_dialog(
title="Select one service",
text="Choose a service:",
values=svc_menu
).run()
self.restart_srv(service_id, False)
print(f"Service {service_id} : restarted")
return True
def start_stack(self, stack=None, endpoint_id=None):
"""Start one stack or all stacks on an endpoint."""
if endpoint_id is not None:
print("Getting endpoint")
self.get_endpoint(endpoint_id)
if stack is not None:
for s in stack:
self.stack_ids = [self._resolve_stack_id(s, endpoint_id)]
for stck in self.stack_ids:
path = f"/stacks/{stck}/start"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self._api_post_no_body(path, timeout=20)
except ValueError as e:
print(f"Error stoping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
)
return True
def stop_stack(self, stack, endpoint_id):
"""Stop one stack or all stacks on an endpoint."""
print(f"Stopping stack {stack}")
if endpoint_id is not None:
self.get_endpoint(endpoint_id)
if stack is not None:
for s in stack:
self.stack_ids = [self._resolve_stack_id(s, endpoint_id)]
# print(self.stack_ids)
for stck in self.stack_ids:
path = f"/stacks/{stck}/stop"
# print(path)
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self._api_post_no_body(path, timeout=120)
except NameError as e:
print(f"Error stopping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
)
return True
def _resolve_endpoint(self, endpoint_id):
self.get_endpoints()
if self._debug:
print(endpoint_id)
print(self.endpoints)
if self._is_number(endpoint_id):
self.endpoint_id = int(endpoint_id)
self.endpoint_name = self.endpoints["by_id"][self.endpoint_id]
else:
self.endpoint_name = endpoint_id
self.endpoint_id = int(self.endpoints["by_name"][endpoint_id])
def _resolve_stack_id(self, stack, endpoint_id):
if stack == "all":
return "all"
if not self._is_number(stack):
result = self.get_stack(stack, endpoint_id)
return result["Id"]
return int(stack)
def _delete_all_stacks(self, endpoint_id):
stacks = self.get_stacks(endpoint_id)
paths = []
for s in stacks:
if int(s["EndpointId"]) != int(endpoint_id):
continue
path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true"
paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
def delete_item(item):
print(f"Delete stack {item[1]} from {item[0]}")
out = self._api_delete(item[2])
logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(delete_item, paths)
return "Done"
def _delete_single_stack(self, stack_id, endpoint_id):
path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true"
# print(path)
try:
out = self._api_delete(path,timeout=240)
except ValueError as e:
msg = str(e)
if "Conflict for url" in msg:
print("Stack with this name may already exist.")
else:
print(f"Error deleting stack: {e}")
return []
return out or []
def delete_stack(self, endpoint_id=None, stack=None):
"""Delete one stack or all stacks on an endpoint."""
self._resolve_endpoint(endpoint_id)
endpoint_id = self.endpoint_id
if stack == "all":
return self._delete_all_stacks(endpoint_id)
else:
for s in stack:
print(f" >> Deleting stack {s} from endpoint {self.endpoint_name}")
stack_id = self._resolve_stack_id(s, endpoint_id)
self._delete_single_stack(stack_id, endpoint_id)
return "Done"
# def delete_stack(self, endpoint_id=None, stack=None):
# """
# Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
# """
# self.get_endpoints()
# if self._is_number(endpoint_id):
# self.endpoint_name = self.endpoints["by_id"][endpoint_id]
# self.endpoint_id = endpoint_id
# else:
# self.endpoint_name = endpoint_id
# self.endpoint_id = self.endpoints["by_name"][endpoint_id]
# if not self._is_number(endpoint_id):
# endpoint_id = int(self.endpoints["by_name"][endpoint_id])
# if not self._is_number(stack) and stack != "all":
# # print(stack)
# # print(self.endpoint_id)
# stack = self.get_stack(stack, self.endpoint_id)["Id"]
# if stack == "all":
# stacks = self.get_stacks(self.endpoint_id)
# paths = []
# for s in stacks:
# # print(f"Delete stack {s['Name']}")
# # print(s['EndpointId'], endpoint_id)
# if int(s["EndpointId"]) != int(endpoint_id):
# continue
# # print("Deleting stack:", s['Name'])
# path = f"/stacks/{s['Id']}"
# if endpoint_id is not None:
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
# paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
# # input(paths)
# def delete(c):
# print(f"Delete stack {c[1]} from {c[0]} ")
# out = self._api_delete(c[2])
# logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}")
# with ThreadPoolExecutor(max_workers=10) as exe:
# exe.map(delete, paths)
# return "Done"
# else:
# path = f"/stacks/{stack}"
# if endpoint_id is not None:
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
# # print(path)
# try:
# # print(path)
# # print(base_url)
# # print(token)
# stacks = self._api_delete(path)
# except Exception as e:
# # print(f"Error creating stack: {e}")
# if "Conflict for url" in str(e):
# print("Stack with this name may already exist.")
# else:
# print(f"Error deleting stack: {e}")
# # print(stacks)
# return []
# if stacks is None:
# return []
# return stacks
def create_secret(self, name, value, endpoint_id=None, timeout=None):
"""Create a Docker secret on the specified endpoint."""
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
path = f"/endpoints/{endpoint_id}/docker/secrets/create"
encoded = base64.b64encode(value.encode()).decode()
data = {"Name": name, "Data": encoded}
return self._api_post(path, data, timeout=timeout)