diff --git a/port.py b/port.py new file mode 100644 index 0000000..388021c --- /dev/null +++ b/port.py @@ -0,0 +1,1362 @@ +'''Portainer API wrapper module.''' + +import os +from concurrent.futures import ThreadPoolExecutor +import json +import sys +import uuid +import shutil +import time +import logging +import base64 +import tabulate +from git import Repo +import requests +import hvac +from prompt_toolkit import prompt +from prompt_toolkit.completion import WordCompleter +from prompt_toolkit.shortcuts import checkboxlist_dialog +from prompt_toolkit.shortcuts import radiolist_dialog + +logger = logging.getLogger(__name__) + + +class PortainerApi: + """ + Simple wrapper around the module-level Portainer helper functions. + Instantiate with base_url and optional token/timeout and call methods + to perform API operations. + """ + + def __init__(self, site, args=None, timeout=120): + self.base_url = None + self.token = None + self.args = args + self.action = None + self._debug = False + self.timeout = timeout + self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" + self.stack_name = None + self.stacks_all = {} + self.stack_id = None + self.stack_ids = [] + self.endpoint_name = None + self.endpoint_id = args.endpoint_id + + # self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git" + self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" + self.repo_dir = "/tmp/docker-compose" + self.basic_stacks = [ + "pihole", + "nginx", + "mosquitto", + "webhub", + "authentik", + "bitwarden", + "mailu3", + "home-assistant", + "homepage", + ] + self.nas_stacks = self.basic_stacks + [ + "gitlab", + "bookstack", + "dockermon", + "gitea", + "grafana", + "immich", + "jupyter", + "kestra", + "mealie", + ] + self.m_server_stacks = self.basic_stacks + [ + "immich", + "zabbix-server", + "gitea", + "unifibrowser", + "mediacenter", + "watchtower", + "wazuh", + "octoprint", + "motioneye", + "kestra", + "bookstack", + "wud", + "uptime-kuma", + "registry", + "regsync", + "dockermon", + "grafana", + "nextcloud", + "semaphore", + "node-red", + "test", + "jupyter", + "paperless", + "mealie", + "n8n", + "ollama", + "rancher", + ] + self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"] + self.rack_stacks = self.basic_stacks + [ + "gitlab", + "bookstack", + "dockermon", + "gitea", + "grafana", + "immich", + "jupyter", + "kestra", + "mealie", + ] + self.log_mode = False + self.hw_mode = False + self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}, "services":{}} + self.get_site(site) + self.get_endpoints() + self.get_stacks() + self.refresh_in_containers() + + def set_defaults(self, config): + '''Set default configuration from provided config dictionary.''' + self.cur_config = config + + def get_site(self, site): + if site == "portainer": + self.base_url = os.getenv( + "PORTAINER_URL", "https://portainer.sectorq.eu/api" + ) + # self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" + token_path = "portainer/token" + self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value'] + elif site == "port": + self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api") + token_path = "port/token" + self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value'] + else: + self.base_url = os.getenv( + "PORTAINER_URL", "https://portainer.sectorq.eu/api" + ) + self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" + self.get_endpoints() + self.get_stacks() + + def _is_number(self, s): + """Check if the input string is a number.""" + try: + float(s) + return True + except ValueError: + return False + + def gotify_message(self, message): + payload = { + "title": "Updates in Portainer", + "message": message, + "priority": 5 + } + '''Send a notification message via Gotify.''' + response = requests.post( + "https://gotify.sectorq.eu/message", + data=payload, + headers={"X-Gotify-Key": "ASn_fIAd5OVjm8c"} + ) + logger.debug(response.text) + # print("Status:", response.status_code) + # print("Response:", response.text) + pass + + def _api_get(self, path, timeout=120): + url = f"{self.base_url.rstrip('/')}{path}" + headers = {"X-API-Key": f"{self.token}"} + resp = requests.get(url, headers=headers, timeout=timeout) + if resp.status_code != 200: + return resp.status_code + print(f"Error: {resp.status_code} - {resp.text}") + # resp.raise_for_status() + return resp.json() + + def _api_post(self, path, json="", timeout=120): + url = f"{self.base_url.rstrip('/')}{path}" + headers = {"X-API-Key": f"{self.token}"} + # print(url) + # print(json) + resp = requests.post(url, headers=headers, json=json, timeout=timeout) + return resp.text + + def _api_put(self, path, json="", timeout=120): + url = f"{self.base_url.rstrip('/')}{path}" + headers = {"X-API-Key": f"{self.token}"} + # print(url) + # print(json) + resp = requests.put(url, headers=headers, json=json, timeout=timeout) + return resp.text + + def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120): + # input("API POST2 called. Press Enter to continue.") + """Example authenticated GET request to Portainer API.""" + url = f"{self.base_url.rstrip('/')}{path}" + headers = {"X-API-Key": f"{self.token}"} + data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)} + # print(data) + resp = requests.post( + url, headers=headers, files=file, data=data, timeout=timeout + ) + resp.raise_for_status() + return resp.json() + + def _api_post_no_body(self, path, timeout=120): + """Example authenticated GET request to Portainer API.""" + url = f"{self.base_url.rstrip('/')}{path}" + # print(url) + headers = {"X-API-Key": f"{self.token}"} + resp = requests.post(url, headers=headers, timeout=timeout) + return resp.text + + def _api_delete(self, path, timeout=120): + """Example authenticated DELETE request to Portainer API.""" + url = f"{self.base_url.rstrip('/')}{path}" + headers = {"X-API-Key": f"{self.token}"} + resp = requests.delete(url, headers=headers, timeout=timeout) + # print(resp) + resp.raise_for_status() + # print(resp.status_code) + return resp.status_code + + def refresh(self): + '''Refresh all data from Portainer.''' + self.get_endpoints() + self.get_stacks(self) + self.get_containers(self) + return True + + def get_stacks(self, endpoint_id="all", timeout=20): + '''Get a list of stacks for a specific endpoint or all endpoints.''' + if endpoint_id != "all": + endpoint_id = self.get_endpoint_id() + path = "/stacks" + stcks = [] + stacks = self._api_get(path, timeout=timeout) + self.stacks_all = {} + fail_endponts = [20, 39, 41] + # print(json.dumps(stacks,indent=2)) + webhooks = {} + for s in stacks: + # print(type(s["AutoUpdate"]) ) + # input(s) + if s["EndpointId"] in fail_endponts: + continue + if not s["EndpointId"] in webhooks: + try: + webhooks[s["EndpointId"]] = {"webhook": {}} + webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}} + except Exception as e: + logger.debug( + f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}" + ) + if not s["EndpointId"] in self.stacks_all: + self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}} + self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = { + "by_id": {}, + "by_name": {}, + } + self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"] + self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][ + s["Id"] + ] = s["Name"] + + self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"] + self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][ + s["Name"] + ] = s["Id"] + # print(s) + + if "AutoUpdate" in s and s["AutoUpdate"] is not None: + if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]: + # print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook']) + # print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW") + webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"] + webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ + "AutoUpdate" + ]["Webhook"] + elif s["AutoUpdate"]["Webhook"] != "": + webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"] + webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ + "Webhook" + ] + + # print(self.stacks_all) + if s["EndpointId"] == endpoint_id or endpoint_id == "all": + stcks.append(s) + # print(stcks) + if stcks is None: + return [] + self.stacks = stacks + self.all_data["stacks"] = self.stacks_all + self.all_data["webhooks"] = webhooks + # input(json.dumps(self.stacks_all,indent=2)) + return stcks + + def get_services(self, endpoint, timeout=30): + '''Get a list of services for a specific stack on an endpoint.''' + # print(json.dumps(self.all_data,indent=2)) + path = f"/endpoints/{self.get_endpoint_id()}/docker/services" + # print(path) + # path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}' + services = self._api_get(path, timeout=timeout) + return services + + def update_status(self, endpoint, stack): + '''Get the update status of a specific stack on an endpoint.''' + path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true" + # input(path) + stats = self._api_get(path) + print(stats) + + def get_endpoint_id(self): + '''Get endpoint ID from either ID or name input.''' + # input(self.args.endpoint_id) + if self._is_number(self.args.endpoint_id): + self.endpoint_id = self.args.endpoint_id + self.endpoint_name = self.endpoints["by_id"][self.args.endpoint_id] + return self.args.endpoint_id + else: + self.endpoint_name = self.args.endpoint_id + self.endpoint_id = self.endpoints["by_name"][self.args.endpoint_id] + return self.endpoints["by_name"][self.args.endpoint_id] + + def get_endpoint_name(self, endpoint): + '''Get endpoint name from either ID or name input.''' + if self._is_number(endpoint): + self.endpoint_id = endpoint + self.endpoint_name = self.all_data["endpoints"]["by_id"][endpoint] + return self.all_data["endpoints"]["by_id"][endpoint] + else: + self.endpoint_name = endpoint + self.endpoint_id = self.all_data["endpoints"]["by_name"][endpoint] + return endpoint + + def refresh_in_containers(self): + '''Get a list of containers for a specific endpoint and stack.''' + # print(json.dumps(self.all_data,indent=2)) + # print(endpoint) + # print(stack) + cont = [] + data = {} + + eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()] + #input(eps) + for endpoint in eps: + if self.all_data["endpoints_status"][endpoint] != 1: + print("Endpoint down") + # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") + continue + path = ( + f"/endpoints/{endpoint}/docker/containers/json?all=1" + ) + logging.info(f"request : {path}") + try: + containers = self._api_get(path) + #input(json.dumps(containers, indent=2)) + except Exception as e: + print(f"failed to get containers from {path}: {e}") + continue + contr = [] + try: + for c in containers: + #input(c) + cont.append([c["Names"][0].replace("/", ""),c["Id"], c['Image']]) + contr.append([c["Names"][0].replace("/", ""), c["Id"], c['Image']]) + if self.all_data["endpoints"]["by_id"][endpoint] in data: + data[self.all_data["endpoints"]["by_id"][endpoint]] = contr + data[endpoint] = contr + else: + data[self.all_data["endpoints"]["by_id"][endpoint]] = contr + data[endpoint] = contr + except Exception as e: + logger.debug( + f"Exception while getting containers for stack {e} ", + f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}", + ) + self.all_data["containers"] = data + + #print(cont) + return cont + + def get_containers(self): + '''Get a list of containers for a specific endpoint and stack.''' + # print(json.dumps(self.all_data,indent=2)) + # print(endpoint) + # print(stack) + cont = [] + data = {} + if self.args.endpoint_id == "all": + eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()] + else: + + eps = [self.get_endpoint_id()] + + for endpoint in eps: + #print(self.args.stack) + if self.args.stack in ["all", None]: + # input([id for id in self.all_data["stacks"][endpoint]['by_id'].keys()]) + for e in [id for id in self.all_data["stacks"][endpoint]['by_name'].keys()]: + #input(e) + # if s not in self.all_data["stacks"]: + # continue + #input(self.all_data) + if self.all_data["endpoints_status"][endpoint] != 1: + # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") + continue + # input(self.all_data["stacks"][endpoint]["by_name"]) + + #input(e) + path = ( + f"/endpoints/{endpoint}/docker/containers/json" + f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}' + ) + logging.info(f"request : {path}") + try: + containers = self._api_get(path) + #input(containers) + except Exception as e: + print(f"failed to get containers from {path}: {e}") + continue + contr = [] + try: + for c in containers: + # input(c) + cont.append(c["Names"][0].replace("/", "")) + contr.append(c["Names"][0].replace("/", "")) + if self.all_data["endpoints"]["by_id"][endpoint] in data: + data[self.all_data["endpoints"]["by_id"][endpoint]][e] = contr + else: + data[self.all_data["endpoints"]["by_id"][endpoint]] = { + e: contr + } + except Exception as e: + logger.debug( + f"Exception while getting containers for stack {e} ", + f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}", + ) + + self.all_data["containers"] = data + + #print(cont) + return cont + + def stop_containers(self, endpoint, containers, timeout=130): + '''Stop containers on an endpoint.''' + if self.all_data["endpoints_status"][endpoint] != 1: + print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline") + ep_id = self.endpoints["by_name"][endpoint] + + def stop(c): + print(f" > Stopping {c}") + self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop") + # print(f"✔") + + with ThreadPoolExecutor(max_workers=10) as exe: + exe.map(stop, containers) + # for c in containers: + # print(f" > Stopping {c}") + # self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop") + # return 0 + + def start_containers(self, endpoint, containers, timeout=130): + '''Start containers on an endpoint.''' + ep_id = self.endpoints["by_name"][endpoint] + + def stop(c): + print(f" > Starting {c}") + self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start") + + with ThreadPoolExecutor(max_workers=10) as exe: + exe.map(stop, containers) + + def update_stack(self, args): + '''Update one stack or all stacks on an endpoint.''' + #print("Updating stacks") + stacks = self.get_stacks(endpoint_id=args.endpoint_id) + stacks_tuples = [] + + for s in stacks: + #print(s) + try: + stacks_tuples.append((s['AutoUpdate']['Webhook'],s['Name'])) + # print(s['Name'], " : ", s['AutoUpdate']['Webhook']) + except: + stacks_tuples.append((s['Webhook'],s['Name'])) + # print(s['Name'], " : ", s['Webhook']) + stacks_dict = dict(stacks_tuples) + print(stacks_dict) + #input(stacks_tuples) + # stacks_tuples = [(s['AutoUpdate']['Webhook'], s['Name']) for s in stacks if "Webhook" in s['AutoUpdate'] ] + + def update(c): + print(f" > Updating {c[1]} ") + ans = self._api_post_no_body(f"/stacks/webhooks/{c[0]}") + logger.debug( + f"Update response for stack {c[0]} on endpoint {ans}" + ) + # input(stacks_tuples) + if args.debug: + input(args) + stacks_tuples = sorted(stacks_tuples, key=lambda x: x[1]) + stack_dict = dict(stacks_tuples) + # input(service_tuples) + if self.args.service_id is None: + #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] + stacks_tuples.insert(0, ("__ALL__", "[Select ALL]")) + stack_ids = checkboxlist_dialog( + title="Select one stack to update", + text="Choose a service:", + values=stacks_tuples + ).run() + stcs = [] + input(stack_ids) + + if args.stack == "all": + for s in stack_dict: + stcs.append([s, stack_dict[s]]) + else: + for s in stack_dict: + if s in stack_ids: + stcs.append([s, stack_dict[s]]) + + print(stcs) + with ThreadPoolExecutor(max_workers=10) as exe: + list(exe.map(update, stcs)) + + input('UPDATED') + if not args.autostart: + time.sleep(120) + cont = [] + for c in self.all_data["containers"][args.endpoint_id]: + if args.stack == c or args.stack == "all": + cont += self.all_data["containers"][args.endpoint_id][c] + self.stop_containers(args.endpoint_id, cont) + + def get_endpoints(self, timeout=10): + '''Get a list of all endpoints.''' + endpoints = self._api_get("/endpoints") + eps = {"by_id": {}, "by_name": {}} + eps_stats = {} + for ep in endpoints: + eps["by_id"][ep["Id"]] = ep["Name"] + eps["by_name"][ep["Name"]] = ep["Id"] + eps_stats[ep["Id"]] = ep["Status"] + eps_stats[ep["Name"]] = ep["Status"] + self.endpoints = eps + self.endpoints_names = list(eps["by_name"]) + self.all_data["endpoints"] = eps + self.all_data["endpoints_status"] = eps_stats + # input(eps_stats) + # input(eps) + return eps + + def get_endpoint(self, endpoint_id=None, timeout=30): + '''Get endpoint ID and name from either ID or name input.''' + self.get_endpoints() + # print(self.endpoints) + if self._is_number(endpoint_id): + self.endpoint_name = self.endpoints["by_id"][endpoint_id] + self.endpoint_id = endpoint_id + else: + self.endpoint_name = endpoint_id + self.endpoint_id = self.endpoints["by_name"][endpoint_id] + return self.endpoint_id + + def get_swarm_id(self, endpoint): + '''Get the swarm ID for a specific endpoint.''' + ep_id = self.endpoints["by_name"][endpoint] + path = f"/endpoints/{ep_id}/docker/info" + stats = self._api_get(path) + return stats["Swarm"]["Cluster"]["ID"] + + def get_stack(self, stack=None, endpoint_id=None, timeout=None): + self.get_stacks(endpoint_id) + if not self._is_number(endpoint_id): + endpoint_id = int(self.endpoints["by_name"][endpoint_id]) + self.stack_id = [] + if stack == "all": + for s in self.stacks: + # print(s) + if endpoint_id == s.get("EndpointId"): + self.stack_ids.append(s.get("Id")) + return self.stack_ids + else: + for s in self.stacks: + # print(s) + match_by_id = ( + stack is not None + and s.get("Id") == stack + and endpoint_id == s.get("EndpointId") + ) + + match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int( + s.get("EndpointId") + ) # Ensure types match for comparison + + if match_by_id or match_by_name: + # if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) + # or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")): + self.stack_id = s.get("Id") + self.stack_name = s.get("Name") + self.stack_ids.append(s.get("Id")) + return s + RED = "\033[91m" + RESET = "\033[0m" + print(ValueError(f"{RED}✗{RESET} >> Stack not found: {stack}")) + return 1 + + def create_stack( + self, + endpoint, + stacks=None, + mode="git", + autostart=False, + stack_mode="swarm", + ): + for stack in stacks: + if stack_mode == "swarm": + swarm_id = self.get_swarm_id(endpoint) + p = "swarm" + env_path = f"{self.repo_dir}/__swarm/{stack}/.env" + else: + p = "standalone" + env_path = f"{self.repo_dir}/{stack}/.env" + # input(swarm_id) + self.endpoint_id = self.get_endpoint_id() + if os.path.exists(self.repo_dir): + shutil.rmtree(self.repo_dir) + else: + print(f"Folder '{self.repo_dir}' does not exist.") + Repo.clone_from(self.git_url, self.repo_dir) + if mode == "git": + path = f"/stacks/create/{p}/repository" + # print(p) + if self.endpoint_id is not None: + path += f"?endpointId={self.endpoint_id}" + + if stack == "all": + if self.endpoint_name == "rack": + stacks = self.rack_stacks + elif self.endpoint_name == "m-server": + stacks = self.m_server_stacks + elif self.endpoint_name == "rpi5": + stacks = self.rpi5_stacks + elif self.endpoint_name == "nas": + stacks = self.nas_stacks + else: + stacks = [stack] + # print(json.dumps(self.stacks_all, indent=2)) + # input(json.dumps(self.stacks_all, indent=2)) + for stack in stacks: + if self.endpoint_id in self.stacks_all: + + # Check if the stack exists by ID or name + stack_check = ( + stack in self.stacks_all[self.endpoint_id]["by_id"] + or stack in self.stacks_all[self.endpoint_id]["by_name"] + ) + if stack_check: + GREEN = "\033[92m" + RESET = "\033[0m" + print(f"{GREEN}✓{RESET} >> Stack {stack} already exist") + continue + print(f"Working on {stack} , stack mode: {stack_mode}") + + envs = [] + if os.path.exists(f"{env_path}"): + f = open(f"{env_path}", "r") + env_vars = f.read().splitlines() + for ev in env_vars: + if ev.startswith("#") or ev.strip() == "": + continue + if "=" in ev: + name, value = ev.split("=", 1) + envs.append({"name": name, "value": value}) + f.close() + # wl(envs) + for e in envs: + # print(f"Env: {e['name']} = {e['value']}") + HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] + if e["name"] == "RESTART" and self.endpoint_name == "m-server": + e["value"] = "always" + if e["name"] in HWS: + # print("Found HW_MODE env var.") + if self.hw_mode: + e["value"] = "hw" + else: + e["value"] = "cpu" + if e["name"] == "LOGGING": + # print("Found LOGGING env var.") + if self.log_mode: + e["value"] = "journald" + else: + e["value"] = "syslog" + + uid = uuid.uuid4() + # print(uid) + req = { + "Name": stack, + "Env": envs, + "AdditionalFiles": [], + "AutoUpdate": { + "forcePullImage": True, + "forceUpdate": False, + "webhook": f"{uid}", + }, + "repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git", + "ReferenceName": "refs/heads/main", + "composeFile": f"{stack}/docker-compose.yml", + "ConfigFilePath": f"{stack}/docker-compose.yml", + "repositoryAuthentication": True, + "repositoryUsername": "jaydee", + "repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS", + "AuthorizationType": 0, + "TLSSkipVerify": False, + "supportRelativePath": True, + "repositoryAuthentication": True, + "fromAppTemplate": False, + "registries": [6, 3], + "FromAppTemplate": False, + "Namespace": "", + "CreatedByUserId": "", + "Webhook": "", + "filesystemPath": "/share/docker_data/portainer/portainer-data/", + "RegistryID": 4, + "isDetachedFromGit": True, + "method": "repository", + "swarmID": None, + } + if stack_mode == "swarm": + req["type"] = "swarm" + req["swarmID"] = swarm_id + req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml" + req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml" + if self._debug: + print(json.dumps(req)) + res = self._api_post(path, req) + if "Id" in res: + # print("Deploy request OK") + pass + else: + print(res) + tries = 0 + created = False + while True: + try: + # print(self.endpoint_id) + # print(stack) + if self.get_stack(stack, self.endpoint_id) != 1: + created = True + break + except Exception as e: + print( + f"Waiting for stack {stack} to be created...{tries}/50", + end="\r", + ) + time.sleep(10) + tries += 1 + if tries > 50: + print( + f"Error retrieving stack {stack} after creation: {self.endpoint_name}" + ) + break + logger.debug(f"Exception while getting stack {stack}: {e}") + + if created: + if stack != "pihole": + # print(autostart) + if not autostart: + # self.get_stacks() + # self.stop_stack(stack,self.endpoint_id) + conts = self.get_containers() + # print(conts) + self.stop_containers(self.endpoint_name, conts) + + if mode == "file": + print("Creating new stack from file...") + path = "/stacks/create/standalone/file" + if self.endpoint_id is not None: + path += f"?endpointId={self.endpoint_id}" + + if stack == "all": + if self.endpoint_name == "rack": + stacks = self.rack_stacks + elif self.endpoint_name == "m-server": + stacks = self.m_server_stacks + elif self.endpoint_name == "rpi5": + stacks = self.rpi5_stacks + else: + stacks = [stack] + for stack in stacks: + print(f"Working on {stack}") + if os.path.exists(f"{self.repo_dir}/{stack}/.env"): + f = open(f"{self.repo_dir}/{stack}/.env", "r") + + env_vars = f.read().splitlines() + envs = [] + for ev in env_vars: + if ev.startswith("#") or ev.strip() == "": + continue + if "=" in ev: + name, value = ev.split("=", 1) + envs.append({"name": name, "value": value}) + f.close() + # wl(envs) + for e in envs: + # print(f"Env: {e['name']} = {e['value']}") + HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] + if e["name"] == "RESTART" and self.endpoint_name == "m-server": + e["value"] = "always" + if e["name"] in HWS: + print("Found HW_MODE env var.") + if self.hw_mode: + e["value"] = "hw" + else: + e["value"] = "cpu" + if e["name"] == "LOGGING": + print("Found LOGGING env var.") + if self.log_mode: + e["value"] = "journald" + else: + e["value"] = "syslog" + + file = { + # ("filename", file_object) + "file": ( + "docker-compose.yml", + open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"), + ), + } + self._api_post_file(path, self.endpoint_id, stack, envs, file) + + def print_stacks(self, args): + """Print a table of stacks, optionally filtered by endpoint.""" + stacks = self.get_stacks() + count = 0 + data = [] + stack_names = [] + for stack in stacks: + # print(stack) + if args.endpoint_id is not None: + if not stack["EndpointId"] in self.endpoints["by_id"]: + continue + if args.endpoint_id != "all": + if self.endpoints["by_name"][args.endpoint_id] != stack["EndpointId"]: + continue + try: + stack_names.append(stack["Name"]) + data.append( + [ + stack["Id"], + stack["Name"], + self.endpoints["by_id"][stack["EndpointId"]], + ] + ) + except KeyError as e: + data.append([stack["Id"], stack["Name"], "?"]) + logger.debug( + "KeyError getting endpoint name for stack %s : %s", stack["Name"], e + ) + count += 1 + + data = sorted(data, key=lambda x: x[1]) + headers = ["StackID", "Name", "Endpoint"] + print(tabulate.tabulate(data, headers=headers, tablefmt="github")) + print(f"Total stacks: {count}") + input("Continue...") + # print(sorted(stack_names)) + + def update_containers(self): + all_containers = self.all_data["containers"][self.args.endpoint_id] + #input(all_containers) + service_tuples = [(s[1], s[0]) for s in all_containers if "." not in s[0] and not s[0].startswith("runner-")] + service_tuples = sorted(service_tuples, key=lambda x: x[1]) + service_dict = dict(service_tuples) + # input(service_tuples) + if self.args.service_id is None: + #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] + service_tuples.insert(0, ("__ALL__", "[Select ALL]")) + service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]")) + service_ids = checkboxlist_dialog( + title="Select one service", + text="Choose a service:", + values=service_tuples + ).run() + elif self.args.service_id == "all": + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" ] + else: + service_ids = [self.args.service_id] + + if self.args.update is False: + if "__ONLY_CHECK__" in service_ids: + service_ids.remove("__ONLY_CHECK__") + pull = False + print("Checking for updates only...") + else: + pull = True + print("Checking for updates and pulling updates...") + else: + pull = True + print("Checking for updates and pulling updates...") + if "__ALL__" in service_ids: + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] + + longest = 0 + for a in service_dict.items(): + # print(a[1]) + if len(a[1]) > longest: + longest = len(a[1]) + #print(longest) + ok = "\033[92m✔\033[0m" + err = "\033[91m✖\033[0m" + updates = [] + for service_id in service_ids: + # print(self.all_data["containers"][self.args.endpoint_id]) + + print("\033[?25l", end="") + print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True) + path = f"/docker/{self.get_endpoint_id()}/containers/{service_id}/image_status?refresh=true" + + try: + resp = self._api_get(path, timeout=20) + except ValueError as e: + print(f"Error restarting service: {e}") + return [] + #print(resp) + if resp == 500: + print("?") + elif resp['Status'] == "outdated": + if pull: + #print("Recreate") + self.recreate_container(service_id, pull) + #print(f"Service {service_dict[service_id]:<{longest}} : updated") + updates.append(service_dict[service_id]) + print(ok, end=" ") + for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]: + if name.startswith(service_dict[service_id]): + print(image) + else: + print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True) + #print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m") + updates.append(service_dict[service_id]) + print(err, end=" ") + for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]: + if name.startswith(service_dict[service_id]): + print(image) + else: + print(ok, end=" ") + for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]: + if name.startswith(service_dict[service_id]): + print(image) + if len(updates) > 0: + if pull: + self.gotify_message(f"Services updated: {", ".join(updates)}") + else: + self.gotify_message(f"Services updates available: {', '.join(updates)}") + print("\033[?25h", end="") + return True + + def update_service(self): + all_services = self.get_services(self.get_endpoint_id()) + #input(all_services) + service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services] + service_tuples = sorted(service_tuples, key=lambda x: x[1]) + service_dict = dict(service_tuples) + # input(service_tuples) + if self.args.service_id is None: + #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] + service_tuples.insert(0, ("__ALL__", "[Select ALL]")) + service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]")) + service_ids = checkboxlist_dialog( + title="Select one service", + text="Choose a service:", + values=service_tuples + ).run() + if "__ONLY_CHECK__" in service_ids: + self.args.update = False + else: + self.args.update = True + if "__ALL__" in service_ids: + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] + + elif self.args.service_id == "all": + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] + else: + service_ids = [self.args.service_id] + + if self.args.update: + pull = True + print("Checking for updates and pulling updates...") + else: + pull = False + print("Checking for updates only...") + + + longest = 0 + for a in service_dict.items(): + if a[0] == "__ONLY_CHECK__": + continue + # print(a[1]) + if len(a[1]) > longest: + longest = len(a[1]) + #print(longest) + ok = "\033[92m✔\033[0m" + err = "\033[91m✖\033[0m" + for service_id in service_ids: + print("\033[?25l", end="") + print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True) + path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true" + + try: + resp = self._api_get(path, timeout=20) + except ValueError as e: + print(f"Error restarting service: {e}") + return [] + + if resp['Status'] == "outdated": + if pull: + self.restart_srv(service_id, pull) + #print(f"Service {service_dict[service_id]:<{longest}} : updated") + self.gotify_message(f"Service {service_dict[service_id]} updated") + print(f"{ok} updated") + else: + print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True) + #print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m") + self.gotify_message(f"Service update available for {service_dict[service_id]}") + print(err) + else: + print(ok) + print("\033[?25h", end="") + return True + + def update_service2(self): + all_services = self.get_services(self.get_endpoint_id(self.args.endpoint_id)) + + service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services] + service_tuples = sorted(service_tuples, key=lambda x: x[1]) + service_dict = dict(service_tuples) + # input(service_tuples) + if self.args.service_id is None: + #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] + service_tuples.insert(0, ("__ALL__", "[Select ALL]")) + service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]")) + service_ids = checkboxlist_dialog( + title="Select one service", + text="Choose a service:", + values=service_tuples + ).run() + elif self.args.service_id == "all": + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] + else: + service_ids = [self.args.service_id] + if "__ONLY_CHECK__" in service_ids or self.args.update is False: + pull = False + print("Checking for updates only...") + else: + print("Checking for updates and pulling updates...") + pull = True + if "__ALL__" in service_ids: + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] + + longest = 0 + for a in service_dict.items(): + # print(a[1]) + if len(a[1]) > longest: + longest = len(a[1]) + #print(longest) + ok = "\033[92m✔\033[0m" + err = "\033[91m✖\033[0m" + for service_id in service_ids: + print("\033[?25l", end="") + print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True) + path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true" + + try: + resp = self._api_get(path, timeout=20) + except ValueError as e: + print(f"Error restarting service: {e}") + return [] + + if resp['Status'] == "outdated": + if pull: + self.restart_srv(service_id, pull) + #print(f"Service {service_dict[service_id]:<{longest}} : updated") + self.gotify_message(f"Service {service_dict[service_id]} updated") + print(ok) + else: + print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True) + #print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m") + self.gotify_message(f"Service update available for {service_dict[service_id]}") + print(err) + else: + print(ok) + print("\033[?25h", end="") + return True + + def recreate_container(self,service_id, pull=False): + """Restart a service on an endpoint.""" + path = f"/docker/{self.endpoint_id}/containers/{service_id}/recreate" + # print(path) + params={"pullImage": pull} + try: + resp = self._api_post(path, json=params, timeout=20) + #print(resp) + except ValueError as e: + print(f"Error restarting service: {e}") + return [] + + def restart_srv(self,service_id, pool=False): + """Restart a service on an endpoint.""" + path = f"/endpoints/{self.endpoint_id}/forceupdateservice" + params={"serviceID": service_id, "pullImage": pool} + try: + resp = self._api_put(path, json=params, timeout=20) + # print(resp) + except ValueError as e: + print(f"Error restarting service: {e}") + return [] + + def restart_service(self, endpoint_id, service_id): + stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] + stacks = sorted(stacks, key=lambda x: x[1]) + stack_id = radiolist_dialog( + title="Select one service", + text="Choose a service:", + values=stacks + ).run() + service_dict = dict(stacks) + services = self.get_services(self.endpoint_name, stack_id) + svc_name = service_dict.get(stack_id) + stack_svcs = [] + svc_menu = [] + for s in services: + try: + if svc_name in s['Spec']['Name']: + stack_svcs.append([s['Version']['Index'], s['Spec']['Name']]) + svc_menu.append([s['ID'], s['Spec']['Name']]) + except KeyError as e: + print(e) + + + service_id = radiolist_dialog( + title="Select one service", + text="Choose a service:", + values=svc_menu + ).run() + + self.restart_srv(service_id, False) + + print(f"Service {service_id} : restarted") + return True + + def start_stack(self, stack=None, endpoint_id=None): + """Start one stack or all stacks on an endpoint.""" + if endpoint_id is not None: + print("Getting endpoint") + self.get_endpoint(endpoint_id) + if stack is not None: + for s in stack: + self.stack_ids = [self._resolve_stack_id(s, endpoint_id)] + for stck in self.stack_ids: + path = f"/stacks/{stck}/start" + if self.endpoint_id is not None: + path += f"?endpointId={self.endpoint_id}" + try: + resp = self._api_post_no_body(path, timeout=20) + except ValueError as e: + print(f"Error stoping stack: {e}") + return [] + if "Id" in json.loads(resp): + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started" + ) + else: + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" + ) + return True + + def stop_stack(self, stack, endpoint_id): + + """Stop one stack or all stacks on an endpoint.""" + print(f"Stopping stack {stack}") + + if endpoint_id is not None: + self.get_endpoint(endpoint_id) + + if stack is not None: + for s in stack: + self.stack_ids = [self._resolve_stack_id(s, endpoint_id)] + # print(self.stack_ids) + for stck in self.stack_ids: + path = f"/stacks/{stck}/stop" + # print(path) + if self.endpoint_id is not None: + path += f"?endpointId={self.endpoint_id}" + try: + resp = self._api_post_no_body(path, timeout=120) + except NameError as e: + print(f"Error stopping stack: {e}") + return [] + if "Id" in json.loads(resp): + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped" + ) + else: + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" + ) + return True + + def _resolve_endpoint(self, endpoint_id): + + self.get_endpoints() + if self._debug: + print(endpoint_id) + print(self.endpoints) + if self._is_number(endpoint_id): + self.endpoint_id = int(endpoint_id) + self.endpoint_name = self.endpoints["by_id"][self.endpoint_id] + else: + self.endpoint_name = endpoint_id + self.endpoint_id = int(self.endpoints["by_name"][endpoint_id]) + + def _resolve_stack_id(self, stack, endpoint_id): + if stack == "all": + return "all" + + if not self._is_number(stack): + result = self.get_stack(stack, endpoint_id) + return result["Id"] + + return int(stack) + + def _delete_all_stacks(self, endpoint_id): + stacks = self.get_stacks(endpoint_id) + paths = [] + + for s in stacks: + if int(s["EndpointId"]) != int(endpoint_id): + continue + + path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true" + paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) + + def delete_item(item): + print(f"Delete stack {item[1]} from {item[0]}") + out = self._api_delete(item[2]) + logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out) + + with ThreadPoolExecutor(max_workers=10) as exe: + exe.map(delete_item, paths) + + return "Done" + + def _delete_single_stack(self, stack_id, endpoint_id): + path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true" + # print(path) + try: + out = self._api_delete(path,timeout=240) + except ValueError as e: + msg = str(e) + if "Conflict for url" in msg: + print("Stack with this name may already exist.") + else: + print(f"Error deleting stack: {e}") + return [] + + return out or [] + + def delete_stack(self, endpoint_id=None, stack=None): + """Delete one stack or all stacks on an endpoint.""" + self._resolve_endpoint(endpoint_id) + endpoint_id = self.endpoint_id + + if stack == "all": + return self._delete_all_stacks(endpoint_id) + else: + for s in stack: + print(f" >> Deleting stack {s} from endpoint {self.endpoint_name}") + stack_id = self._resolve_stack_id(s, endpoint_id) + self._delete_single_stack(stack_id, endpoint_id) + return "Done" + + # def delete_stack(self, endpoint_id=None, stack=None): + # """ + # Return a list of stacks. If endpoint_id is provided, it will be added as a query param. + # """ + # self.get_endpoints() + # if self._is_number(endpoint_id): + # self.endpoint_name = self.endpoints["by_id"][endpoint_id] + # self.endpoint_id = endpoint_id + # else: + # self.endpoint_name = endpoint_id + # self.endpoint_id = self.endpoints["by_name"][endpoint_id] + + # if not self._is_number(endpoint_id): + # endpoint_id = int(self.endpoints["by_name"][endpoint_id]) + + # if not self._is_number(stack) and stack != "all": + # # print(stack) + # # print(self.endpoint_id) + # stack = self.get_stack(stack, self.endpoint_id)["Id"] + # if stack == "all": + # stacks = self.get_stacks(self.endpoint_id) + # paths = [] + # for s in stacks: + # # print(f"Delete stack {s['Name']}") + # # print(s['EndpointId'], endpoint_id) + # if int(s["EndpointId"]) != int(endpoint_id): + # continue + # # print("Deleting stack:", s['Name']) + # path = f"/stacks/{s['Id']}" + # if endpoint_id is not None: + # path += f"?endpointId={endpoint_id}&removeVolumes=true" + # paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) + # # input(paths) + + # def delete(c): + # print(f"Delete stack {c[1]} from {c[0]} ") + # out = self._api_delete(c[2]) + # logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}") + + # with ThreadPoolExecutor(max_workers=10) as exe: + # exe.map(delete, paths) + # return "Done" + # else: + # path = f"/stacks/{stack}" + + # if endpoint_id is not None: + # path += f"?endpointId={endpoint_id}&removeVolumes=true" + # # print(path) + # try: + # # print(path) + # # print(base_url) + # # print(token) + # stacks = self._api_delete(path) + # except Exception as e: + # # print(f"Error creating stack: {e}") + # if "Conflict for url" in str(e): + # print("Stack with this name may already exist.") + # else: + # print(f"Error deleting stack: {e}") + # # print(stacks) + # return [] + # if stacks is None: + # return [] + + # return stacks + + def create_secret(self, name, value, endpoint_id=None, timeout=None): + """Create a Docker secret on the specified endpoint.""" + endpoint_id = int(self.endpoints["by_name"][endpoint_id]) + path = f"/endpoints/{endpoint_id}/docker/secrets/create" + encoded = base64.b64encode(value.encode()).decode() + data = {"Name": name, "Data": encoded} + + return self._api_post(path, data, timeout=timeout) diff --git a/portainer.py b/portainer.py index f40f83b..abd35ae 100755 --- a/portainer.py +++ b/portainer.py @@ -36,7 +36,7 @@ else: raise Exception("Failed to authenticate with Vault") # Specify the mount point of your KV engine -VERSION = "0.1.46" +VERSION = "0.1.47" defaults = { "endpoint_id": "vm01",