'''Portainer API wrapper module.''' import os from concurrent.futures import ThreadPoolExecutor import json import sys import uuid import shutil import time import logging import base64 import tabulate from git import Repo import requests logger = logging.getLogger(__name__) class Portainer: """ Simple wrapper around the module-level Portainer helper functions. Instantiate with base_url and optional token/timeout and call methods to perform API operations. """ def __init__(self, site, timeout=10): self.base_url = None self.token = None self._debug = False self.timeout = timeout self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.stack_name = None self.stacks_all = {} self.stack_id = None self.stack_ids = [] self.endpoint_name = None self.endpoint_id = None # self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git" self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.repo_dir = "/tmp/docker-compose" self.basic_stacks = [ "pihole", "nginx", "mosquitto", "webhub", "authentik", "bitwarden", "mailu3", "home-assistant", "homepage", ] self.nas_stacks = self.basic_stacks + [ "gitlab", "bookstack", "dockermon", "gitea", "grafana", "immich", "jupyter", "kestra", "mealie", ] self.m_server_stacks = self.basic_stacks + [ "immich", "zabbix-server", "gitea", "unifibrowser", "mediacenter", "watchtower", "wazuh", "octoprint", "motioneye", "kestra", "bookstack", "wud", "uptime-kuma", "registry", "regsync", "dockermon", "grafana", "nextcloud", "semaphore", "node-red", "test", "jupyter", "paperless", "mealie", "n8n", "ollama", "rancher", ] self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"] self.rack_stacks = self.basic_stacks + [ "gitlab", "bookstack", "dockermon", "gitea", "grafana", "immich", "jupyter", "kestra", "mealie", ] self.log_mode = False self.hw_mode = False self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}} self.get_site(site) self.get_endpoints() self.get_stacks() self.get_containers() def get_site(self, site): if site == "portainer": self.base_url = os.getenv( "PORTAINER_URL", "https://portainer.sectorq.eu/api" ) self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" elif site == "port": self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api") self.token = "ptr_/5RkMCT/j3BTaL32vMSDtXFi76yOXRKVFOrUtzMsl5Y=" else: self.base_url = os.getenv( "PORTAINER_URL", "https://portainer.sectorq.eu/api" ) self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" self.get_endpoints() self.get_stacks() def _is_number(self, s): """Check if the input string is a number.""" try: float(s) return True except ValueError: return False def _api_get(self, path, timeout=120): url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} resp = requests.get(url, headers=headers, timeout=timeout) resp.raise_for_status() return resp.json() def _api_post(self, path, json="", timeout=120): url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} # print(url) # print(json) resp = requests.post(url, headers=headers, json=json, timeout=timeout) return resp.text def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120): # input("API POST2 called. Press Enter to continue.") """Example authenticated GET request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)} # print(data) resp = requests.post( url, headers=headers, files=file, data=data, timeout=timeout ) resp.raise_for_status() return resp.json() def _api_post_no_body(self, path, timeout=120): """Example authenticated GET request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" # print(url) headers = {"X-API-Key": f"{self.token}"} resp = requests.post(url, headers=headers, timeout=timeout) return resp.text def _api_delete(self, path, timeout=120): """Example authenticated DELETE request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} resp = requests.delete(url, headers=headers, timeout=timeout) # print(resp) resp.raise_for_status() # print(resp.status_code) return resp.status_code def refresh(self): '''Refresh all data from Portainer.''' self.get_endpoints() self.get_stacks(self) self.get_containers(self) return True def get_stacks(self, endpoint_id="all", timeout=10): '''Get a list of stacks for a specific endpoint or all endpoints.''' if endpoint_id != "all": endpoint_id = self.get_endpoint_id(endpoint_id) path = "/stacks" stcks = [] stacks = self._api_get(path, timeout=timeout) self.stacks_all = {} fail_endponts = [20, 39, 41] # print(json.dumps(stacks,indent=2)) webhooks = {} for s in stacks: # print(type(s["AutoUpdate"]) ) # input(s) if s["EndpointId"] in fail_endponts: continue if not s["EndpointId"] in webhooks: try: webhooks[s["EndpointId"]] = {"webhook": {}} webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}} except Exception as e: logger.debug( f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}" ) if not s["EndpointId"] in self.stacks_all: self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}} self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = { "by_id": {}, "by_name": {}, } self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"] self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][ s["Id"] ] = s["Name"] self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"] self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][ s["Name"] ] = s["Id"] # print(s) if "AutoUpdate" in s and s["AutoUpdate"] is not None: if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]: # print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook']) # print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW") webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"] webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ "AutoUpdate" ]["Webhook"] elif s["AutoUpdate"]["Webhook"] != "": webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"] webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ "Webhook" ] # print(self.stacks_all) if s["EndpointId"] == endpoint_id or endpoint_id == "all": stcks.append(s) # print(stcks) if stcks is None: return [] self.stacks = stacks self.all_data["stacks"] = self.stacks_all self.all_data["webhooks"] = webhooks # input(json.dumps(self.stacks_all,indent=2)) return stcks def update_status(self, endpoint, stack): '''Get the update status of a specific stack on an endpoint.''' path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true" # input(path) stats = self._api_get(path) print(stats) def get_endpoint_id(self, endpoint): '''Get endpoint ID from either ID or name input.''' if self._is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] return endpoint else: self.endpoint_name = endpoint self.endpoint_id = self.endpoints["by_name"][endpoint] return self.endpoints["by_name"][endpoint] def get_endpoint_name(self, endpoint): '''Get endpoint name from either ID or name input.''' if self._is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] return self.endpoints["by_id"][endpoint] else: self.endpoint_name = endpoint self.endpoint_id = self.endpoints["by_name"][endpoint] return endpoint def get_containers(self, endpoint="all", stack="all", timeout=30): '''Get a list of containers for a specific endpoint and stack.''' # print(json.dumps(self.all_data,indent=2)) # print(endpoint) # print(stack) cont = [] data = {} if endpoint == "all": for s in self.all_data["endpoints"]["by_id"]: # print(s) if stack == "all": if s not in self.all_data["stacks"]: continue if self.all_data["endpoints_status"][s] != 1: # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") continue for e in self.all_data["stacks"][s]["by_name"]: path = ( f"/endpoints/{s}/docker/containers/json" f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}' ) logging.info(f"request : {path}") try: containers = self._api_get(path) except Exception as e: print(f"failed to get containers from {path}: {e}") continue contr = [] try: for c in containers: cont.append(c["Names"][0].replace("/", "")) contr.append(c["Names"][0].replace("/", "")) if self.all_data["endpoints"]["by_id"][s] in data: data[self.all_data["endpoints"]["by_id"][s]][e] = contr else: data[self.all_data["endpoints"]["by_id"][s]] = { e: contr } except Exception as e: logger.debug( f"Exception while getting containers for stack {e} ", f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}", ) # print(data) self.all_data["containers"] = data else: self.get_containers() for i in self.all_data["containers"][endpoint][stack]: cont.append(i) return cont def stop_containers(self, endpoint, containers, timeout=130): '''Stop containers on an endpoint.''' if self.all_data["endpoints_status"][endpoint] != 1: print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline") ep_id = self.endpoints["by_name"][endpoint] def stop(c): print(f" > Stopping {c}") self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop") # print(f"✔") with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) # for c in containers: # print(f" > Stopping {c}") # self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop") # return 0 def start_containers(self, endpoint, containers, timeout=130): '''Start containers on an endpoint.''' ep_id = self.endpoints["by_name"][endpoint] def stop(c): print(f" > Starting {c}") self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start") with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) def update_stack(self, endpoint, stack, autostart, timeout=130): '''Update one stack or all stacks on an endpoint.''' stcs = [] if stack == "all": for s in self.all_data["webhooks"][endpoint]: stcs.append([s, self.all_data["webhooks"][endpoint][s]]) else: stcs.append([stack, self.all_data["webhooks"][endpoint][stack]]) # input(stcs) def update(c): print(f" > Updating {c[0]} on {endpoint}") ans = self._api_post_no_body(f"/stacks/webhooks/{c[1]}") logger.debug( f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}" ) def stop(): cont = [] for c in self.all_data["containers"][endpoint]: if stack == c or stack == "all": cont += self.all_data["containers"][endpoint][c] self.stop_containers(endpoint, cont) with ThreadPoolExecutor(max_workers=10) as exe: exe.map(update, stcs) if not autostart: time.sleep(120) cont = [] for c in self.all_data["containers"][endpoint]: if stack == c or stack == "all": cont += self.all_data["containers"][endpoint][c] self.stop_containers(endpoint, cont) def get_endpoints(self, timeout=10): '''Get a list of all endpoints.''' endpoints = self._api_get("/endpoints") eps = {"by_id": {}, "by_name": {}} eps_stats = {} for ep in endpoints: eps["by_id"][ep["Id"]] = ep["Name"] eps["by_name"][ep["Name"]] = ep["Id"] eps_stats[ep["Id"]] = ep["Status"] eps_stats[ep["Name"]] = ep["Status"] self.endpoints = eps self.endpoints_names = list(eps["by_name"]) self.all_data["endpoints"] = eps self.all_data["endpoints_status"] = eps_stats # input(eps_stats) # input(eps) return eps def get_endpoint(self, endpoint_id=None, timeout=30): '''Get endpoint ID and name from either ID or name input.''' self.get_endpoints() # print(self.endpoints) if self._is_number(endpoint_id): self.endpoint_name = self.endpoints["by_id"][endpoint_id] self.endpoint_id = endpoint_id else: self.endpoint_name = endpoint_id self.endpoint_id = self.endpoints["by_name"][endpoint_id] return self.endpoint_id def get_swarm_id(self, endpoint): '''Get the swarm ID for a specific endpoint.''' ep_id = self.endpoints["by_name"][endpoint] path = f"/endpoints/{ep_id}/docker/info" stats = self._api_get(path) return stats["Swarm"]["Cluster"]["ID"] def get_stack(self, stack=None, endpoint_id=None, timeout=None): self.get_stacks(endpoint_id) if not self._is_number(endpoint_id): endpoint_id = int(self.endpoints["by_name"][endpoint_id]) self.stack_id = [] if stack == "all": for s in self.stacks: # print(s) if endpoint_id == s.get("EndpointId"): self.stack_ids.append(s.get("Id")) return self.stack_ids else: for s in self.stacks: # print(s) match_by_id = ( stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId") ) match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int( s.get("EndpointId") ) # Ensure types match for comparison if match_by_id or match_by_name: # if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) # or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")): self.stack_id = s.get("Id") self.stack_name = s.get("Name") self.stack_ids.append(s.get("Id")) return s print(ValueError(f"Stack not found: {stack}")) sys.exit(1) def create_stack( self, endpoint, stack=None, mode="git", autostart=False, stack_mode="swarm", ): if stack_mode == "swarm": swarm_id = self.get_swarm_id(endpoint) p = "swarm" env_path = f"{self.repo_dir}/__swarm/{stack}/.env" else: p = "standalone" env_path = f"{self.repo_dir}/{stack}/.env" # input(swarm_id) self.endpoint_id = self.get_endpoint_id(endpoint) if os.path.exists(self.repo_dir): shutil.rmtree(self.repo_dir) else: print(f"Folder '{self.repo_dir}' does not exist.") Repo.clone_from(self.git_url, self.repo_dir) if mode == "git": path = f"/stacks/create/{p}/repository" print(p) if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" if stack == "all": if self.endpoint_name == "rack": stacks = self.rack_stacks elif self.endpoint_name == "m-server": stacks = self.m_server_stacks elif self.endpoint_name == "rpi5": stacks = self.rpi5_stacks elif self.endpoint_name == "nas": stacks = self.nas_stacks else: stacks = [stack] # print(json.dumps(self.stacks_all, indent=2)) # input(json.dumps(self.stacks_all, indent=2)) for stack in stacks: if self.endpoint_id in self.stacks_all: # Check if the stack exists by ID or name stack_check = ( stack in self.stacks_all[self.endpoint_id]["by_id"] or stack in self.stacks_all[self.endpoint_id]["by_name"] ) if stack_check: print(f"Stack {stack} already exist") continue print(f"Working on {stack} , stack mode: {stack_mode}") envs = [] if os.path.exists(f"{env_path}"): f = open(f"{env_path}", "r") env_vars = f.read().splitlines() for ev in env_vars: if ev.startswith("#") or ev.strip() == "": continue if "=" in ev: name, value = ev.split("=", 1) envs.append({"name": name, "value": value}) f.close() # wl(envs) for e in envs: # print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] if e["name"] == "RESTART" and self.endpoint_name == "m-server": e["value"] = "always" if e["name"] in HWS: # print("Found HW_MODE env var.") if self.hw_mode: e["value"] = "hw" else: e["value"] = "cpu" if e["name"] == "LOGGING": # print("Found LOGGING env var.") if self.log_mode: e["value"] = "journald" else: e["value"] = "syslog" uid = uuid.uuid4() # print(uid) req = { "Name": stack, "Env": envs, "AdditionalFiles": [], "AutoUpdate": { "forcePullImage": True, "forceUpdate": False, "webhook": f"{uid}", }, "repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git", "ReferenceName": "refs/heads/main", "composeFile": f"{stack}/docker-compose.yml", "ConfigFilePath": f"{stack}/docker-compose.yml", "repositoryAuthentication": True, "repositoryUsername": "jaydee", "repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS", "AuthorizationType": 0, "TLSSkipVerify": False, "supportRelativePath": True, "repositoryAuthentication": True, "fromAppTemplate": False, "registries": [6, 3], "FromAppTemplate": False, "Namespace": "", "CreatedByUserId": "", "Webhook": "", "filesystemPath": "/share/docker_data/portainer/portainer-data/", "RegistryID": 4, "isDetachedFromGit": True, "method": "repository", "swarmID": None, } if stack_mode == "swarm": req["type"] = "swarm" req["swarmID"] = swarm_id req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml" req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml" if self._debug: print(json.dumps(req)) res = self._api_post(path, req) if "Id" in res: # print("Deploy request OK") pass else: print(res) tries = 0 created = False while True: try: # print(self.endpoint_id) # print(stack) self.get_stack(stack, self.endpoint_id) created = True break except Exception as e: print( f"Waiting for stack {stack} to be created...{tries}/50", end="\r", ) time.sleep(10) tries += 1 if tries > 50: print( f"Error retrieving stack {stack} after creation: {self.endpoint_name}" ) break logger.debug(f"Exception while getting stack {stack}: {e}") if created: if stack != "pihole": # print(autostart) if not autostart: # self.get_stacks() # self.stop_stack(stack,self.endpoint_id) conts = self.get_containers(self.endpoint_name, stack) # print(conts) self.stop_containers(self.endpoint_name, conts) if mode == "file": print("Creating new stack from file...") path = "/stacks/create/standalone/file" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" if stack == "all": if self.endpoint_name == "rack": stacks = self.rack_stacks elif self.endpoint_name == "m-server": stacks = self.m_server_stacks elif self.endpoint_name == "rpi5": stacks = self.rpi5_stacks else: stacks = [stack] for stack in stacks: print(f"Working on {stack}") if os.path.exists(f"{self.repo_dir}/{stack}/.env"): f = open(f"{self.repo_dir}/{stack}/.env", "r") env_vars = f.read().splitlines() envs = [] for ev in env_vars: if ev.startswith("#") or ev.strip() == "": continue if "=" in ev: name, value = ev.split("=", 1) envs.append({"name": name, "value": value}) f.close() # wl(envs) for e in envs: # print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] if e["name"] == "RESTART" and self.endpoint_name == "m-server": e["value"] = "always" if e["name"] in HWS: print("Found HW_MODE env var.") if self.hw_mode: e["value"] = "hw" else: e["value"] = "cpu" if e["name"] == "LOGGING": print("Found LOGGING env var.") if self.log_mode: e["value"] = "journald" else: e["value"] = "syslog" file = { # ("filename", file_object) "file": ( "docker-compose.yml", open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"), ), } self._api_post_file(path, self.endpoint_id, stack, envs, file) def print_stacks(self, endpoint="all"): """Print a table of stacks, optionally filtered by endpoint.""" stacks = self.get_stacks() count = 0 data = [] for stack in stacks: if endpoint is not None: if not stack["EndpointId"] in self.endpoints["by_id"]: continue if endpoint != "all": if self.endpoints["by_name"][endpoint] != stack["EndpointId"]: continue try: data.append( [ stack["Id"], stack["Name"], self.endpoints["by_id"][stack["EndpointId"]], ] ) except KeyError as e: data.append([stack["Id"], stack["Name"], "?"]) logger.debug( "KeyError getting endpoint name for stack %s : %s", stack["Name"], e ) count += 1 headers = ["StackID", "Name", "Endpoint"] print(tabulate.tabulate(data, headers=headers, tablefmt="github")) print(f"Total stacks: {count}") def start_stack(self, stack=None, endpoint_id=None): """Start one stack or all stacks on an endpoint.""" if endpoint_id is not None: print("Getting endpoint") self.get_endpoint(endpoint_id) if stack is not None: self.get_stack(stack, endpoint_id) for stck in self.stack_ids: path = f"/stacks/{stck}/start" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" try: resp = self._api_post_no_body(path, timeout=20) except ValueError as e: print(f"Error stoping stack: {e}") return [] if "Id" in json.loads(resp): print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started" ) else: print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" ) return True def stop_stack(self, stack, endpoint_id): """Stop one stack or all stacks on an endpoint.""" print(f"Stopping stack {stack}") if endpoint_id is not None: self.get_endpoint(endpoint_id) if stack == "all": self.get_stack(stack, endpoint_id) else: if stack is not None: self.stack_ids = [self._resolve_stack_id(stack, endpoint_id)] # print(self.stack_ids) for stck in self.stack_ids: path = f"/stacks/{stck}/stop" # print(path) if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" try: resp = self._api_post_no_body(path, timeout=120) except NameError as e: print(f"Error stopping stack: {e}") return [] if "Id" in json.loads(resp): print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped" ) else: print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" ) return True def _resolve_endpoint(self, endpoint_id): self.get_endpoints() if self._debug: print(endpoint_id) print(self.endpoints) if self._is_number(endpoint_id): self.endpoint_id = int(endpoint_id) self.endpoint_name = self.endpoints["by_id"][self.endpoint_id] else: self.endpoint_name = endpoint_id self.endpoint_id = int(self.endpoints["by_name"][endpoint_id]) def _resolve_stack_id(self, stack, endpoint_id): if stack == "all": return "all" if not self._is_number(stack): result = self.get_stack(stack, endpoint_id) return result["Id"] return int(stack) def _delete_all_stacks(self, endpoint_id): stacks = self.get_stacks(endpoint_id) paths = [] for s in stacks: if int(s["EndpointId"]) != int(endpoint_id): continue path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true" paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) def delete_item(item): print(f"Delete stack {item[1]} from {item[0]}") out = self._api_delete(item[2]) logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out) with ThreadPoolExecutor(max_workers=10) as exe: exe.map(delete_item, paths) return "Done" def _delete_single_stack(self, stack_id, endpoint_id): path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true" try: out = self._api_delete(path) except ValueError as e: msg = str(e) if "Conflict for url" in msg: print("Stack with this name may already exist.") else: print(f"Error deleting stack: {e}") return [] return out or [] def delete_stack(self, endpoint_id=None, stack=None): """Delete one stack or all stacks on an endpoint.""" self._resolve_endpoint(endpoint_id) endpoint_id = self.endpoint_id stack_id = self._resolve_stack_id(stack, endpoint_id) if stack == "all": return self._delete_all_stacks(endpoint_id) return self._delete_single_stack(stack_id, endpoint_id) # def delete_stack(self, endpoint_id=None, stack=None): # """ # Return a list of stacks. If endpoint_id is provided, it will be added as a query param. # """ # self.get_endpoints() # if self._is_number(endpoint_id): # self.endpoint_name = self.endpoints["by_id"][endpoint_id] # self.endpoint_id = endpoint_id # else: # self.endpoint_name = endpoint_id # self.endpoint_id = self.endpoints["by_name"][endpoint_id] # if not self._is_number(endpoint_id): # endpoint_id = int(self.endpoints["by_name"][endpoint_id]) # if not self._is_number(stack) and stack != "all": # # print(stack) # # print(self.endpoint_id) # stack = self.get_stack(stack, self.endpoint_id)["Id"] # if stack == "all": # stacks = self.get_stacks(self.endpoint_id) # paths = [] # for s in stacks: # # print(f"Delete stack {s['Name']}") # # print(s['EndpointId'], endpoint_id) # if int(s["EndpointId"]) != int(endpoint_id): # continue # # print("Deleting stack:", s['Name']) # path = f"/stacks/{s['Id']}" # if endpoint_id is not None: # path += f"?endpointId={endpoint_id}&removeVolumes=true" # paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) # # input(paths) # def delete(c): # print(f"Delete stack {c[1]} from {c[0]} ") # out = self._api_delete(c[2]) # logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}") # with ThreadPoolExecutor(max_workers=10) as exe: # exe.map(delete, paths) # return "Done" # else: # path = f"/stacks/{stack}" # if endpoint_id is not None: # path += f"?endpointId={endpoint_id}&removeVolumes=true" # # print(path) # try: # # print(path) # # print(base_url) # # print(token) # stacks = self._api_delete(path) # except Exception as e: # # print(f"Error creating stack: {e}") # if "Conflict for url" in str(e): # print("Stack with this name may already exist.") # else: # print(f"Error deleting stack: {e}") # # print(stacks) # return [] # if stacks is None: # return [] # return stacks def create_secret(self, name, value, endpoint_id=None, timeout=None): """Create a Docker secret on the specified endpoint.""" endpoint_id = int(self.endpoints["by_name"][endpoint_id]) path = f"/endpoints/{endpoint_id}/docker/secrets/create" encoded = base64.b64encode(value.encode()).decode() data = {"Name": name, "Data": encoded} return self._api_post(path, data, timeout=timeout)