mirror of
https://gitlab.sectorq.eu/jaydee/portainer.git
synced 2025-12-13 18:24:53 +01:00
1097 lines
44 KiB
Python
1097 lines
44 KiB
Python
'''Portainer API wrapper module.'''
|
|
|
|
import os
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import json
|
|
import sys
|
|
import uuid
|
|
import shutil
|
|
import time
|
|
import logging
|
|
import base64
|
|
import tabulate
|
|
from git import Repo
|
|
import requests
|
|
import hvac
|
|
from prompt_toolkit import prompt
|
|
from prompt_toolkit.completion import WordCompleter
|
|
from prompt_toolkit.shortcuts import checkboxlist_dialog
|
|
from prompt_toolkit.shortcuts import radiolist_dialog
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Portainer:
|
|
"""
|
|
Simple wrapper around the module-level Portainer helper functions.
|
|
Instantiate with base_url and optional token/timeout and call methods
|
|
to perform API operations.
|
|
"""
|
|
|
|
def __init__(self, site, args=None, timeout=120):
|
|
self.base_url = None
|
|
self.token = None
|
|
self.args = args
|
|
self.action = None
|
|
self._debug = False
|
|
self.timeout = timeout
|
|
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
|
|
self.stack_name = None
|
|
self.stacks_all = {}
|
|
self.stack_id = None
|
|
self.stack_ids = []
|
|
self.endpoint_name = None
|
|
self.endpoint_id = args.endpoint_id
|
|
|
|
# self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git"
|
|
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
|
|
self.repo_dir = "/tmp/docker-compose"
|
|
self.basic_stacks = [
|
|
"pihole",
|
|
"nginx",
|
|
"mosquitto",
|
|
"webhub",
|
|
"authentik",
|
|
"bitwarden",
|
|
"mailu3",
|
|
"home-assistant",
|
|
"homepage",
|
|
]
|
|
self.nas_stacks = self.basic_stacks + [
|
|
"gitlab",
|
|
"bookstack",
|
|
"dockermon",
|
|
"gitea",
|
|
"grafana",
|
|
"immich",
|
|
"jupyter",
|
|
"kestra",
|
|
"mealie",
|
|
]
|
|
self.m_server_stacks = self.basic_stacks + [
|
|
"immich",
|
|
"zabbix-server",
|
|
"gitea",
|
|
"unifibrowser",
|
|
"mediacenter",
|
|
"watchtower",
|
|
"wazuh",
|
|
"octoprint",
|
|
"motioneye",
|
|
"kestra",
|
|
"bookstack",
|
|
"wud",
|
|
"uptime-kuma",
|
|
"registry",
|
|
"regsync",
|
|
"dockermon",
|
|
"grafana",
|
|
"nextcloud",
|
|
"semaphore",
|
|
"node-red",
|
|
"test",
|
|
"jupyter",
|
|
"paperless",
|
|
"mealie",
|
|
"n8n",
|
|
"ollama",
|
|
"rancher",
|
|
]
|
|
self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"]
|
|
self.rack_stacks = self.basic_stacks + [
|
|
"gitlab",
|
|
"bookstack",
|
|
"dockermon",
|
|
"gitea",
|
|
"grafana",
|
|
"immich",
|
|
"jupyter",
|
|
"kestra",
|
|
"mealie",
|
|
]
|
|
self.log_mode = False
|
|
self.hw_mode = False
|
|
self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}, "services":{}}
|
|
self.get_site(site)
|
|
self.get_endpoints()
|
|
self.get_stacks()
|
|
self.get_containers()
|
|
|
|
def set_defaults(self, config):
|
|
'''Set default configuration from provided config dictionary.'''
|
|
self.cur_config = config
|
|
|
|
def get_site(self, site):
|
|
if site == "portainer":
|
|
self.base_url = os.getenv(
|
|
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
|
|
)
|
|
# self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
|
|
token_path = "portainer/token"
|
|
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
|
|
elif site == "port":
|
|
self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api")
|
|
token_path = "port/token"
|
|
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
|
|
else:
|
|
self.base_url = os.getenv(
|
|
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
|
|
)
|
|
self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
|
|
self.get_endpoints()
|
|
self.get_stacks()
|
|
|
|
def _is_number(self, s):
|
|
"""Check if the input string is a number."""
|
|
try:
|
|
float(s)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
def gotify_message(self, message):
|
|
payload = {
|
|
"title": "Updates in Portainer",
|
|
"message": message,
|
|
"priority": 5
|
|
}
|
|
'''Send a notification message via Gotify.'''
|
|
response = requests.post(
|
|
"https://gotify.sectorq.eu/message",
|
|
data=payload,
|
|
headers={"X-Gotify-Key": "ASn_fIAd5OVjm8c"}
|
|
)
|
|
# print("Status:", response.status_code)
|
|
# print("Response:", response.text)
|
|
pass
|
|
|
|
def _api_get(self, path, timeout=120):
|
|
url = f"{self.base_url.rstrip('/')}{path}"
|
|
headers = {"X-API-Key": f"{self.token}"}
|
|
resp = requests.get(url, headers=headers, timeout=timeout)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def _api_post(self, path, json="", timeout=120):
|
|
url = f"{self.base_url.rstrip('/')}{path}"
|
|
headers = {"X-API-Key": f"{self.token}"}
|
|
# print(url)
|
|
# print(json)
|
|
resp = requests.post(url, headers=headers, json=json, timeout=timeout)
|
|
return resp.text
|
|
|
|
def _api_put(self, path, json="", timeout=120):
|
|
url = f"{self.base_url.rstrip('/')}{path}"
|
|
headers = {"X-API-Key": f"{self.token}"}
|
|
# print(url)
|
|
# print(json)
|
|
resp = requests.put(url, headers=headers, json=json, timeout=timeout)
|
|
return resp.text
|
|
|
|
def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
|
|
# input("API POST2 called. Press Enter to continue.")
|
|
"""Example authenticated GET request to Portainer API."""
|
|
url = f"{self.base_url.rstrip('/')}{path}"
|
|
headers = {"X-API-Key": f"{self.token}"}
|
|
data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)}
|
|
# print(data)
|
|
resp = requests.post(
|
|
url, headers=headers, files=file, data=data, timeout=timeout
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def _api_post_no_body(self, path, timeout=120):
|
|
"""Example authenticated GET request to Portainer API."""
|
|
url = f"{self.base_url.rstrip('/')}{path}"
|
|
# print(url)
|
|
headers = {"X-API-Key": f"{self.token}"}
|
|
resp = requests.post(url, headers=headers, timeout=timeout)
|
|
return resp.text
|
|
|
|
def _api_delete(self, path, timeout=120):
|
|
"""Example authenticated DELETE request to Portainer API."""
|
|
url = f"{self.base_url.rstrip('/')}{path}"
|
|
headers = {"X-API-Key": f"{self.token}"}
|
|
resp = requests.delete(url, headers=headers, timeout=timeout)
|
|
# print(resp)
|
|
resp.raise_for_status()
|
|
# print(resp.status_code)
|
|
return resp.status_code
|
|
|
|
def refresh(self):
|
|
'''Refresh all data from Portainer.'''
|
|
self.get_endpoints()
|
|
self.get_stacks(self)
|
|
self.get_containers(self)
|
|
return True
|
|
|
|
def get_stacks(self, endpoint_id="all", timeout=20):
|
|
'''Get a list of stacks for a specific endpoint or all endpoints.'''
|
|
if endpoint_id != "all":
|
|
endpoint_id = self.get_endpoint_id(endpoint_id)
|
|
path = "/stacks"
|
|
stcks = []
|
|
stacks = self._api_get(path, timeout=timeout)
|
|
self.stacks_all = {}
|
|
fail_endponts = [20, 39, 41]
|
|
# print(json.dumps(stacks,indent=2))
|
|
webhooks = {}
|
|
for s in stacks:
|
|
# print(type(s["AutoUpdate"]) )
|
|
# input(s)
|
|
if s["EndpointId"] in fail_endponts:
|
|
continue
|
|
if not s["EndpointId"] in webhooks:
|
|
try:
|
|
webhooks[s["EndpointId"]] = {"webhook": {}}
|
|
webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}}
|
|
except Exception as e:
|
|
logger.debug(
|
|
f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}"
|
|
)
|
|
if not s["EndpointId"] in self.stacks_all:
|
|
self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}}
|
|
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = {
|
|
"by_id": {},
|
|
"by_name": {},
|
|
}
|
|
self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"]
|
|
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][
|
|
s["Id"]
|
|
] = s["Name"]
|
|
|
|
self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"]
|
|
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][
|
|
s["Name"]
|
|
] = s["Id"]
|
|
# print(s)
|
|
|
|
if "AutoUpdate" in s and s["AutoUpdate"] is not None:
|
|
if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]:
|
|
# print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
|
|
# print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
|
|
webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"]
|
|
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
|
|
"AutoUpdate"
|
|
]["Webhook"]
|
|
elif s["AutoUpdate"]["Webhook"] != "":
|
|
webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"]
|
|
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
|
|
"Webhook"
|
|
]
|
|
|
|
# print(self.stacks_all)
|
|
if s["EndpointId"] == endpoint_id or endpoint_id == "all":
|
|
stcks.append(s)
|
|
# print(stcks)
|
|
if stcks is None:
|
|
return []
|
|
self.stacks = stacks
|
|
self.all_data["stacks"] = self.stacks_all
|
|
self.all_data["webhooks"] = webhooks
|
|
# input(json.dumps(self.stacks_all,indent=2))
|
|
return stcks
|
|
|
|
def get_services(self, endpoint, timeout=30):
|
|
'''Get a list of services for a specific stack on an endpoint.'''
|
|
# print(json.dumps(self.all_data,indent=2))
|
|
path = f"/endpoints/{self.get_endpoint_id(endpoint)}/docker/services"
|
|
# print(path)
|
|
# path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}'
|
|
services = self._api_get(path, timeout=timeout)
|
|
return services
|
|
|
|
def update_status(self, endpoint, stack):
|
|
'''Get the update status of a specific stack on an endpoint.'''
|
|
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
|
|
# input(path)
|
|
stats = self._api_get(path)
|
|
print(stats)
|
|
|
|
def get_endpoint_id(self, endpoint):
|
|
'''Get endpoint ID from either ID or name input.'''
|
|
if self._is_number(endpoint):
|
|
self.endpoint_id = endpoint
|
|
self.endpoint_name = self.endpoints["by_id"][endpoint]
|
|
return endpoint
|
|
else:
|
|
self.endpoint_name = endpoint
|
|
self.endpoint_id = self.endpoints["by_name"][endpoint]
|
|
return self.endpoints["by_name"][endpoint]
|
|
|
|
def get_endpoint_name(self, endpoint):
|
|
'''Get endpoint name from either ID or name input.'''
|
|
if self._is_number(endpoint):
|
|
self.endpoint_id = endpoint
|
|
self.endpoint_name = self.endpoints["by_id"][endpoint]
|
|
return self.endpoints["by_id"][endpoint]
|
|
else:
|
|
self.endpoint_name = endpoint
|
|
self.endpoint_id = self.endpoints["by_name"][endpoint]
|
|
return endpoint
|
|
|
|
def get_containers(self, endpoint="all", stack="all", timeout=30):
|
|
'''Get a list of containers for a specific endpoint and stack.'''
|
|
# print(json.dumps(self.all_data,indent=2))
|
|
# print(endpoint)
|
|
# print(stack)
|
|
cont = []
|
|
data = {}
|
|
if endpoint == "all":
|
|
for s in self.all_data["endpoints"]["by_id"]:
|
|
# print(s)
|
|
if stack == "all":
|
|
if s not in self.all_data["stacks"]:
|
|
continue
|
|
if self.all_data["endpoints_status"][s] != 1:
|
|
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
|
|
continue
|
|
for e in self.all_data["stacks"][s]["by_name"]:
|
|
path = (
|
|
f"/endpoints/{s}/docker/containers/json"
|
|
f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
|
|
)
|
|
logging.info(f"request : {path}")
|
|
try:
|
|
containers = self._api_get(path)
|
|
except Exception as e:
|
|
print(f"failed to get containers from {path}: {e}")
|
|
continue
|
|
contr = []
|
|
try:
|
|
for c in containers:
|
|
cont.append(c["Names"][0].replace("/", ""))
|
|
contr.append(c["Names"][0].replace("/", ""))
|
|
if self.all_data["endpoints"]["by_id"][s] in data:
|
|
data[self.all_data["endpoints"]["by_id"][s]][e] = contr
|
|
else:
|
|
data[self.all_data["endpoints"]["by_id"][s]] = {
|
|
e: contr
|
|
}
|
|
except Exception as e:
|
|
logger.debug(
|
|
f"Exception while getting containers for stack {e} ",
|
|
f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}",
|
|
)
|
|
# print(data)
|
|
self.all_data["containers"] = data
|
|
else:
|
|
self.get_containers()
|
|
|
|
for i in self.all_data["containers"][endpoint][stack]:
|
|
cont.append(i)
|
|
return cont
|
|
|
|
def stop_containers(self, endpoint, containers, timeout=130):
|
|
'''Stop containers on an endpoint.'''
|
|
if self.all_data["endpoints_status"][endpoint] != 1:
|
|
print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline")
|
|
ep_id = self.endpoints["by_name"][endpoint]
|
|
|
|
def stop(c):
|
|
print(f" > Stopping {c}")
|
|
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop")
|
|
# print(f"✔")
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as exe:
|
|
exe.map(stop, containers)
|
|
# for c in containers:
|
|
# print(f" > Stopping {c}")
|
|
# self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
|
|
# return 0
|
|
|
|
def start_containers(self, endpoint, containers, timeout=130):
|
|
'''Start containers on an endpoint.'''
|
|
ep_id = self.endpoints["by_name"][endpoint]
|
|
|
|
def stop(c):
|
|
print(f" > Starting {c}")
|
|
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start")
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as exe:
|
|
exe.map(stop, containers)
|
|
|
|
def update_stack(self, endpoint, stack, autostart, timeout=130):
|
|
'''Update one stack or all stacks on an endpoint.'''
|
|
stcs = []
|
|
if stack == "all":
|
|
for s in self.all_data["webhooks"][endpoint]:
|
|
stcs.append([s, self.all_data["webhooks"][endpoint][s]])
|
|
else:
|
|
try:
|
|
stcs.append([stack, self.all_data["webhooks"][endpoint][stack]])
|
|
except Exception as e:
|
|
print(f"Error: Stack {stack} not found on endpoint {endpoint}: {e}")
|
|
|
|
|
|
# input(stcs)
|
|
def update(c):
|
|
print(f" > Updating {c[0]} on {endpoint}")
|
|
ans = self._api_post_no_body(f"/stacks/webhooks/{c[1]}")
|
|
logger.debug(
|
|
f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}"
|
|
)
|
|
|
|
def stop():
|
|
cont = []
|
|
for c in self.all_data["containers"][endpoint]:
|
|
if stack == c or stack == "all":
|
|
cont += self.all_data["containers"][endpoint][c]
|
|
self.stop_containers(endpoint, cont)
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as exe:
|
|
exe.map(update, stcs)
|
|
|
|
if not autostart:
|
|
time.sleep(120)
|
|
cont = []
|
|
for c in self.all_data["containers"][endpoint]:
|
|
if stack == c or stack == "all":
|
|
cont += self.all_data["containers"][endpoint][c]
|
|
self.stop_containers(endpoint, cont)
|
|
|
|
def get_endpoints(self, timeout=10):
|
|
'''Get a list of all endpoints.'''
|
|
endpoints = self._api_get("/endpoints")
|
|
eps = {"by_id": {}, "by_name": {}}
|
|
eps_stats = {}
|
|
for ep in endpoints:
|
|
eps["by_id"][ep["Id"]] = ep["Name"]
|
|
eps["by_name"][ep["Name"]] = ep["Id"]
|
|
eps_stats[ep["Id"]] = ep["Status"]
|
|
eps_stats[ep["Name"]] = ep["Status"]
|
|
self.endpoints = eps
|
|
self.endpoints_names = list(eps["by_name"])
|
|
self.all_data["endpoints"] = eps
|
|
self.all_data["endpoints_status"] = eps_stats
|
|
# input(eps_stats)
|
|
# input(eps)
|
|
return eps
|
|
|
|
def get_endpoint(self, endpoint_id=None, timeout=30):
|
|
'''Get endpoint ID and name from either ID or name input.'''
|
|
self.get_endpoints()
|
|
# print(self.endpoints)
|
|
if self._is_number(endpoint_id):
|
|
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
|
|
self.endpoint_id = endpoint_id
|
|
else:
|
|
self.endpoint_name = endpoint_id
|
|
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
|
|
return self.endpoint_id
|
|
|
|
def get_swarm_id(self, endpoint):
|
|
'''Get the swarm ID for a specific endpoint.'''
|
|
ep_id = self.endpoints["by_name"][endpoint]
|
|
path = f"/endpoints/{ep_id}/docker/info"
|
|
stats = self._api_get(path)
|
|
return stats["Swarm"]["Cluster"]["ID"]
|
|
|
|
def get_stack(self, stack=None, endpoint_id=None, timeout=None):
|
|
self.get_stacks(endpoint_id)
|
|
if not self._is_number(endpoint_id):
|
|
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
|
|
self.stack_id = []
|
|
if stack == "all":
|
|
for s in self.stacks:
|
|
# print(s)
|
|
if endpoint_id == s.get("EndpointId"):
|
|
self.stack_ids.append(s.get("Id"))
|
|
return self.stack_ids
|
|
else:
|
|
for s in self.stacks:
|
|
# print(s)
|
|
match_by_id = (
|
|
stack is not None
|
|
and s.get("Id") == stack
|
|
and endpoint_id == s.get("EndpointId")
|
|
)
|
|
|
|
match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int(
|
|
s.get("EndpointId")
|
|
) # Ensure types match for comparison
|
|
|
|
if match_by_id or match_by_name:
|
|
# if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId"))
|
|
# or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")):
|
|
self.stack_id = s.get("Id")
|
|
self.stack_name = s.get("Name")
|
|
self.stack_ids.append(s.get("Id"))
|
|
return s
|
|
RED = "\033[91m"
|
|
RESET = "\033[0m"
|
|
print(ValueError(f"{RED}✗{RESET} >> Stack not found: {stack}"))
|
|
return 1
|
|
|
|
def create_stack(
|
|
self,
|
|
endpoint,
|
|
stacks=None,
|
|
mode="git",
|
|
autostart=False,
|
|
stack_mode="swarm",
|
|
):
|
|
for stack in stacks:
|
|
if stack_mode == "swarm":
|
|
swarm_id = self.get_swarm_id(endpoint)
|
|
p = "swarm"
|
|
env_path = f"{self.repo_dir}/__swarm/{stack}/.env"
|
|
else:
|
|
p = "standalone"
|
|
env_path = f"{self.repo_dir}/{stack}/.env"
|
|
# input(swarm_id)
|
|
self.endpoint_id = self.get_endpoint_id(endpoint)
|
|
if os.path.exists(self.repo_dir):
|
|
shutil.rmtree(self.repo_dir)
|
|
else:
|
|
print(f"Folder '{self.repo_dir}' does not exist.")
|
|
Repo.clone_from(self.git_url, self.repo_dir)
|
|
if mode == "git":
|
|
path = f"/stacks/create/{p}/repository"
|
|
# print(p)
|
|
if self.endpoint_id is not None:
|
|
path += f"?endpointId={self.endpoint_id}"
|
|
|
|
if stack == "all":
|
|
if self.endpoint_name == "rack":
|
|
stacks = self.rack_stacks
|
|
elif self.endpoint_name == "m-server":
|
|
stacks = self.m_server_stacks
|
|
elif self.endpoint_name == "rpi5":
|
|
stacks = self.rpi5_stacks
|
|
elif self.endpoint_name == "nas":
|
|
stacks = self.nas_stacks
|
|
else:
|
|
stacks = [stack]
|
|
# print(json.dumps(self.stacks_all, indent=2))
|
|
# input(json.dumps(self.stacks_all, indent=2))
|
|
for stack in stacks:
|
|
if self.endpoint_id in self.stacks_all:
|
|
|
|
# Check if the stack exists by ID or name
|
|
stack_check = (
|
|
stack in self.stacks_all[self.endpoint_id]["by_id"]
|
|
or stack in self.stacks_all[self.endpoint_id]["by_name"]
|
|
)
|
|
if stack_check:
|
|
GREEN = "\033[92m"
|
|
RESET = "\033[0m"
|
|
print(f"{GREEN}✓{RESET} >> Stack {stack} already exist")
|
|
continue
|
|
print(f"Working on {stack} , stack mode: {stack_mode}")
|
|
|
|
envs = []
|
|
if os.path.exists(f"{env_path}"):
|
|
f = open(f"{env_path}", "r")
|
|
env_vars = f.read().splitlines()
|
|
for ev in env_vars:
|
|
if ev.startswith("#") or ev.strip() == "":
|
|
continue
|
|
if "=" in ev:
|
|
name, value = ev.split("=", 1)
|
|
envs.append({"name": name, "value": value})
|
|
f.close()
|
|
# wl(envs)
|
|
for e in envs:
|
|
# print(f"Env: {e['name']} = {e['value']}")
|
|
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
|
|
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
|
|
e["value"] = "always"
|
|
if e["name"] in HWS:
|
|
# print("Found HW_MODE env var.")
|
|
if self.hw_mode:
|
|
e["value"] = "hw"
|
|
else:
|
|
e["value"] = "cpu"
|
|
if e["name"] == "LOGGING":
|
|
# print("Found LOGGING env var.")
|
|
if self.log_mode:
|
|
e["value"] = "journald"
|
|
else:
|
|
e["value"] = "syslog"
|
|
|
|
uid = uuid.uuid4()
|
|
# print(uid)
|
|
req = {
|
|
"Name": stack,
|
|
"Env": envs,
|
|
"AdditionalFiles": [],
|
|
"AutoUpdate": {
|
|
"forcePullImage": True,
|
|
"forceUpdate": False,
|
|
"webhook": f"{uid}",
|
|
},
|
|
"repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git",
|
|
"ReferenceName": "refs/heads/main",
|
|
"composeFile": f"{stack}/docker-compose.yml",
|
|
"ConfigFilePath": f"{stack}/docker-compose.yml",
|
|
"repositoryAuthentication": True,
|
|
"repositoryUsername": "jaydee",
|
|
"repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS",
|
|
"AuthorizationType": 0,
|
|
"TLSSkipVerify": False,
|
|
"supportRelativePath": True,
|
|
"repositoryAuthentication": True,
|
|
"fromAppTemplate": False,
|
|
"registries": [6, 3],
|
|
"FromAppTemplate": False,
|
|
"Namespace": "",
|
|
"CreatedByUserId": "",
|
|
"Webhook": "",
|
|
"filesystemPath": "/share/docker_data/portainer/portainer-data/",
|
|
"RegistryID": 4,
|
|
"isDetachedFromGit": True,
|
|
"method": "repository",
|
|
"swarmID": None,
|
|
}
|
|
if stack_mode == "swarm":
|
|
req["type"] = "swarm"
|
|
req["swarmID"] = swarm_id
|
|
req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml"
|
|
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
|
|
if self._debug:
|
|
print(json.dumps(req))
|
|
res = self._api_post(path, req)
|
|
if "Id" in res:
|
|
# print("Deploy request OK")
|
|
pass
|
|
else:
|
|
print(res)
|
|
tries = 0
|
|
created = False
|
|
while True:
|
|
try:
|
|
# print(self.endpoint_id)
|
|
# print(stack)
|
|
if self.get_stack(stack, self.endpoint_id) != 1:
|
|
created = True
|
|
break
|
|
except Exception as e:
|
|
print(
|
|
f"Waiting for stack {stack} to be created...{tries}/50",
|
|
end="\r",
|
|
)
|
|
time.sleep(10)
|
|
tries += 1
|
|
if tries > 50:
|
|
print(
|
|
f"Error retrieving stack {stack} after creation: {self.endpoint_name}"
|
|
)
|
|
break
|
|
logger.debug(f"Exception while getting stack {stack}: {e}")
|
|
|
|
if created:
|
|
if stack != "pihole":
|
|
# print(autostart)
|
|
if not autostart:
|
|
# self.get_stacks()
|
|
# self.stop_stack(stack,self.endpoint_id)
|
|
conts = self.get_containers(self.endpoint_name, stack)
|
|
# print(conts)
|
|
self.stop_containers(self.endpoint_name, conts)
|
|
|
|
if mode == "file":
|
|
print("Creating new stack from file...")
|
|
path = "/stacks/create/standalone/file"
|
|
if self.endpoint_id is not None:
|
|
path += f"?endpointId={self.endpoint_id}"
|
|
|
|
if stack == "all":
|
|
if self.endpoint_name == "rack":
|
|
stacks = self.rack_stacks
|
|
elif self.endpoint_name == "m-server":
|
|
stacks = self.m_server_stacks
|
|
elif self.endpoint_name == "rpi5":
|
|
stacks = self.rpi5_stacks
|
|
else:
|
|
stacks = [stack]
|
|
for stack in stacks:
|
|
print(f"Working on {stack}")
|
|
if os.path.exists(f"{self.repo_dir}/{stack}/.env"):
|
|
f = open(f"{self.repo_dir}/{stack}/.env", "r")
|
|
|
|
env_vars = f.read().splitlines()
|
|
envs = []
|
|
for ev in env_vars:
|
|
if ev.startswith("#") or ev.strip() == "":
|
|
continue
|
|
if "=" in ev:
|
|
name, value = ev.split("=", 1)
|
|
envs.append({"name": name, "value": value})
|
|
f.close()
|
|
# wl(envs)
|
|
for e in envs:
|
|
# print(f"Env: {e['name']} = {e['value']}")
|
|
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
|
|
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
|
|
e["value"] = "always"
|
|
if e["name"] in HWS:
|
|
print("Found HW_MODE env var.")
|
|
if self.hw_mode:
|
|
e["value"] = "hw"
|
|
else:
|
|
e["value"] = "cpu"
|
|
if e["name"] == "LOGGING":
|
|
print("Found LOGGING env var.")
|
|
if self.log_mode:
|
|
e["value"] = "journald"
|
|
else:
|
|
e["value"] = "syslog"
|
|
|
|
file = {
|
|
# ("filename", file_object)
|
|
"file": (
|
|
"docker-compose.yml",
|
|
open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"),
|
|
),
|
|
}
|
|
self._api_post_file(path, self.endpoint_id, stack, envs, file)
|
|
|
|
def print_stacks(self, endpoint="all"):
|
|
"""Print a table of stacks, optionally filtered by endpoint."""
|
|
stacks = self.get_stacks()
|
|
count = 0
|
|
data = []
|
|
stack_names = []
|
|
for stack in stacks:
|
|
if endpoint is not None:
|
|
if not stack["EndpointId"] in self.endpoints["by_id"]:
|
|
continue
|
|
if endpoint != "all":
|
|
if self.endpoints["by_name"][endpoint] != stack["EndpointId"]:
|
|
continue
|
|
try:
|
|
stack_names.append(stack["Name"])
|
|
data.append(
|
|
[
|
|
stack["Id"],
|
|
stack["Name"],
|
|
self.endpoints["by_id"][stack["EndpointId"]],
|
|
]
|
|
)
|
|
except KeyError as e:
|
|
data.append([stack["Id"], stack["Name"], "?"])
|
|
logger.debug(
|
|
"KeyError getting endpoint name for stack %s : %s", stack["Name"], e
|
|
)
|
|
count += 1
|
|
|
|
headers = ["StackID", "Name", "Endpoint"]
|
|
print(tabulate.tabulate(data, headers=headers, tablefmt="github"))
|
|
print(f"Total stacks: {count}")
|
|
# print(sorted(stack_names))
|
|
|
|
def update_service(self):
|
|
all_services = self.get_services(self.get_endpoint_id(self.args.endpoint_id))
|
|
|
|
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
|
|
service_tuples = sorted(service_tuples, key=lambda x: x[1])
|
|
service_dict = dict(service_tuples)
|
|
# input(service_tuples)
|
|
if self.args.service_id is None:
|
|
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
|
|
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
|
|
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
|
|
service_ids = checkboxlist_dialog(
|
|
title="Select one service",
|
|
text="Choose a service:",
|
|
values=service_tuples
|
|
).run()
|
|
elif self.args.service_id == "all":
|
|
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
|
|
else:
|
|
service_ids = [self.args.service_id]
|
|
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
|
|
pull = False
|
|
print("Checking for updates only...")
|
|
else:
|
|
print("Checking for updates and pulling updates...")
|
|
pull = True
|
|
if "__ALL__" in service_ids:
|
|
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
|
|
|
|
longest = 0
|
|
for a in service_dict.items():
|
|
# print(a[1])
|
|
if len(a[1]) > longest:
|
|
longest = len(a[1])
|
|
#print(longest)
|
|
ok = "\033[92m✔\033[0m"
|
|
err = "\033[91m✖\033[0m"
|
|
for service_id in service_ids:
|
|
print("\033[?25l", end="")
|
|
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
|
|
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true"
|
|
|
|
try:
|
|
resp = self._api_get(path, timeout=20)
|
|
except ValueError as e:
|
|
print(f"Error restarting service: {e}")
|
|
return []
|
|
|
|
if resp['Status'] == "outdated":
|
|
if pull:
|
|
self.restart_srv(service_id, pull)
|
|
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
|
|
self.gotify_message(f"Service {service_dict[service_id]} updated")
|
|
print(ok)
|
|
else:
|
|
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
|
|
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
|
|
self.gotify_message(f"Service update available for {service_dict[service_id]}")
|
|
print(err)
|
|
else:
|
|
print(ok)
|
|
print("\033[?25h", end="")
|
|
return True
|
|
|
|
def restart_srv(self,service_id, pool=False):
|
|
"""Restart a service on an endpoint."""
|
|
path = f"/endpoints/{self.endpoint_id}/forceupdateservice"
|
|
params={"serviceID": service_id, "pullImage": pool}
|
|
try:
|
|
resp = self._api_put(path, json=params, timeout=20)
|
|
print(resp)
|
|
except ValueError as e:
|
|
print(f"Error restarting service: {e}")
|
|
return []
|
|
|
|
def restart_service(self, endpoint_id, service_id):
|
|
stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
|
|
stack_id = radiolist_dialog(
|
|
title="Select one service",
|
|
text="Choose a service:",
|
|
values=stacks
|
|
).run()
|
|
service_dict = dict(stacks)
|
|
services = self.get_services(self.endpoint_name, stack_id)
|
|
svc_name = service_dict.get(stack_id)
|
|
stack_svcs = []
|
|
svc_menu = []
|
|
for s in services:
|
|
try:
|
|
if svc_name in s['Spec']['Name']:
|
|
stack_svcs.append([s['Version']['Index'], s['Spec']['Name']])
|
|
svc_menu.append([s['ID'], s['Spec']['Name']])
|
|
except KeyError as e:
|
|
print(e)
|
|
|
|
|
|
service_id = radiolist_dialog(
|
|
title="Select one service",
|
|
text="Choose a service:",
|
|
values=svc_menu
|
|
).run()
|
|
|
|
self.restart_srv(service_id, False)
|
|
|
|
print(f"Service {service_id} : restarted")
|
|
return True
|
|
|
|
def start_stack(self, stack=None, endpoint_id=None):
|
|
"""Start one stack or all stacks on an endpoint."""
|
|
if endpoint_id is not None:
|
|
print("Getting endpoint")
|
|
self.get_endpoint(endpoint_id)
|
|
if stack is not None:
|
|
for s in stack:
|
|
self.stack_ids = [self._resolve_stack_id(s, endpoint_id)]
|
|
for stck in self.stack_ids:
|
|
path = f"/stacks/{stck}/start"
|
|
if self.endpoint_id is not None:
|
|
path += f"?endpointId={self.endpoint_id}"
|
|
try:
|
|
resp = self._api_post_no_body(path, timeout=20)
|
|
except ValueError as e:
|
|
print(f"Error stoping stack: {e}")
|
|
return []
|
|
if "Id" in json.loads(resp):
|
|
print(
|
|
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started"
|
|
)
|
|
else:
|
|
print(
|
|
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
|
|
)
|
|
return True
|
|
|
|
def stop_stack(self, stack, endpoint_id):
|
|
|
|
"""Stop one stack or all stacks on an endpoint."""
|
|
print(f"Stopping stack {stack}")
|
|
|
|
if endpoint_id is not None:
|
|
self.get_endpoint(endpoint_id)
|
|
|
|
if stack is not None:
|
|
for s in stack:
|
|
self.stack_ids = [self._resolve_stack_id(s, endpoint_id)]
|
|
# print(self.stack_ids)
|
|
for stck in self.stack_ids:
|
|
path = f"/stacks/{stck}/stop"
|
|
# print(path)
|
|
if self.endpoint_id is not None:
|
|
path += f"?endpointId={self.endpoint_id}"
|
|
try:
|
|
resp = self._api_post_no_body(path, timeout=120)
|
|
except NameError as e:
|
|
print(f"Error stopping stack: {e}")
|
|
return []
|
|
if "Id" in json.loads(resp):
|
|
print(
|
|
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped"
|
|
)
|
|
else:
|
|
print(
|
|
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
|
|
)
|
|
return True
|
|
|
|
def _resolve_endpoint(self, endpoint_id):
|
|
|
|
self.get_endpoints()
|
|
if self._debug:
|
|
print(endpoint_id)
|
|
print(self.endpoints)
|
|
if self._is_number(endpoint_id):
|
|
self.endpoint_id = int(endpoint_id)
|
|
self.endpoint_name = self.endpoints["by_id"][self.endpoint_id]
|
|
else:
|
|
self.endpoint_name = endpoint_id
|
|
self.endpoint_id = int(self.endpoints["by_name"][endpoint_id])
|
|
|
|
def _resolve_stack_id(self, stack, endpoint_id):
|
|
if stack == "all":
|
|
return "all"
|
|
|
|
if not self._is_number(stack):
|
|
result = self.get_stack(stack, endpoint_id)
|
|
return result["Id"]
|
|
|
|
return int(stack)
|
|
|
|
def _delete_all_stacks(self, endpoint_id):
|
|
stacks = self.get_stacks(endpoint_id)
|
|
paths = []
|
|
|
|
for s in stacks:
|
|
if int(s["EndpointId"]) != int(endpoint_id):
|
|
continue
|
|
|
|
path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true"
|
|
paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
|
|
|
|
def delete_item(item):
|
|
print(f"Delete stack {item[1]} from {item[0]}")
|
|
out = self._api_delete(item[2])
|
|
logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out)
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as exe:
|
|
exe.map(delete_item, paths)
|
|
|
|
return "Done"
|
|
|
|
def _delete_single_stack(self, stack_id, endpoint_id):
|
|
path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true"
|
|
# print(path)
|
|
try:
|
|
out = self._api_delete(path,timeout=240)
|
|
except ValueError as e:
|
|
msg = str(e)
|
|
if "Conflict for url" in msg:
|
|
print("Stack with this name may already exist.")
|
|
else:
|
|
print(f"Error deleting stack: {e}")
|
|
return []
|
|
|
|
return out or []
|
|
|
|
def delete_stack(self, endpoint_id=None, stack=None):
|
|
"""Delete one stack or all stacks on an endpoint."""
|
|
self._resolve_endpoint(endpoint_id)
|
|
endpoint_id = self.endpoint_id
|
|
|
|
if stack == "all":
|
|
return self._delete_all_stacks(endpoint_id)
|
|
else:
|
|
for s in stack:
|
|
print(f" >> Deleting stack {s} from endpoint {self.endpoint_name}")
|
|
stack_id = self._resolve_stack_id(s, endpoint_id)
|
|
self._delete_single_stack(stack_id, endpoint_id)
|
|
return "Done"
|
|
|
|
# def delete_stack(self, endpoint_id=None, stack=None):
|
|
# """
|
|
# Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
|
|
# """
|
|
# self.get_endpoints()
|
|
# if self._is_number(endpoint_id):
|
|
# self.endpoint_name = self.endpoints["by_id"][endpoint_id]
|
|
# self.endpoint_id = endpoint_id
|
|
# else:
|
|
# self.endpoint_name = endpoint_id
|
|
# self.endpoint_id = self.endpoints["by_name"][endpoint_id]
|
|
|
|
# if not self._is_number(endpoint_id):
|
|
# endpoint_id = int(self.endpoints["by_name"][endpoint_id])
|
|
|
|
# if not self._is_number(stack) and stack != "all":
|
|
# # print(stack)
|
|
# # print(self.endpoint_id)
|
|
# stack = self.get_stack(stack, self.endpoint_id)["Id"]
|
|
# if stack == "all":
|
|
# stacks = self.get_stacks(self.endpoint_id)
|
|
# paths = []
|
|
# for s in stacks:
|
|
# # print(f"Delete stack {s['Name']}")
|
|
# # print(s['EndpointId'], endpoint_id)
|
|
# if int(s["EndpointId"]) != int(endpoint_id):
|
|
# continue
|
|
# # print("Deleting stack:", s['Name'])
|
|
# path = f"/stacks/{s['Id']}"
|
|
# if endpoint_id is not None:
|
|
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
|
|
# paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
|
|
# # input(paths)
|
|
|
|
# def delete(c):
|
|
# print(f"Delete stack {c[1]} from {c[0]} ")
|
|
# out = self._api_delete(c[2])
|
|
# logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}")
|
|
|
|
# with ThreadPoolExecutor(max_workers=10) as exe:
|
|
# exe.map(delete, paths)
|
|
# return "Done"
|
|
# else:
|
|
# path = f"/stacks/{stack}"
|
|
|
|
# if endpoint_id is not None:
|
|
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
|
|
# # print(path)
|
|
# try:
|
|
# # print(path)
|
|
# # print(base_url)
|
|
# # print(token)
|
|
# stacks = self._api_delete(path)
|
|
# except Exception as e:
|
|
# # print(f"Error creating stack: {e}")
|
|
# if "Conflict for url" in str(e):
|
|
# print("Stack with this name may already exist.")
|
|
# else:
|
|
# print(f"Error deleting stack: {e}")
|
|
# # print(stacks)
|
|
# return []
|
|
# if stacks is None:
|
|
# return []
|
|
|
|
# return stacks
|
|
|
|
def create_secret(self, name, value, endpoint_id=None, timeout=None):
|
|
"""Create a Docker secret on the specified endpoint."""
|
|
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
|
|
path = f"/endpoints/{endpoint_id}/docker/secrets/create"
|
|
encoded = base64.b64encode(value.encode()).decode()
|
|
data = {"Name": name, "Data": encoded}
|
|
return self._api_post(path, data, timeout=timeout)
|