import os from concurrent.futures import ThreadPoolExecutor import json import sys import uuid import shutil import time import logging import base64 import tabulate from git import Repo import requests logger = logging.getLogger(__name__) class Portainer: """ Simple wrapper around the module-level Portainer helper functions. Instantiate with base_url and optional token/timeout and call methods to perform API operations. """ def __init__(self, base_url, token, timeout=10): self.base_url = base_url.rstrip("/") self.token = token self.timeout = timeout self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.stack_name = None self.stacks_all = {} self.stack_id = None self.stack_ids = [] self.endpoint_name = None self.endpoint_id = None # self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git" self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.repo_dir = "/tmp/docker-compose" self.basic_stacks = [ "pihole", "nginx", "mosquitto", "webhub", "authentik", "bitwarden", "mailu3", "home-assistant", "homepage", ] self.nas_stacks = self.basic_stacks + [ "gitlab", "bookstack", "dockermon", "gitea", "grafana", "immich", "jupyter", "kestra", "mealie", ] self.m_server_stacks = self.basic_stacks + [ "immich", "zabbix-server", "gitea", "unifibrowser", "mediacenter", "watchtower", "wazuh", "octoprint", "motioneye", "kestra", "bookstack", "wud", "uptime-kuma", "registry", "regsync", "dockermon", "grafana", "nextcloud", "semaphore", "node-red", "test", "jupyter", "paperless", "mealie", "n8n", "ollama", "rancher", ] self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"] self.rack_stacks = self.basic_stacks + [ "gitlab", "bookstack", "dockermon", "gitea", "grafana", "immich", "jupyter", "kestra", "mealie", ] self.log_mode = False self.hw_mode = False self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}} self.get_endpoints() self.get_stacks() self.get_containers() def _is_number(self, s): """Check if the input string is a number.""" try: float(s) return True except ValueError: return False def _api_get(self, path, timeout=120): url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} resp = requests.get(url, headers=headers, timeout=timeout) resp.raise_for_status() return resp.json() def _api_post(self, path, json="", timeout=120): url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} # print(url) # print(json) resp = requests.post(url, headers=headers, json=json, timeout=timeout) return resp.text def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120): # input("API POST2 called. Press Enter to continue.") """Example authenticated GET request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)} # print(data) resp = requests.post( url, headers=headers, files=file, data=data, timeout=timeout ) resp.raise_for_status() return resp.json() def _api_post_no_body(self, path, timeout=120): """Example authenticated GET request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" # print(url) headers = {"X-API-Key": f"{self.token}"} resp = requests.post(url, headers=headers, timeout=timeout) return resp.text def _api_delete(self, path, timeout=120): """Example authenticated DELETE request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} resp = requests.delete(url, headers=headers, timeout=timeout) # print(resp) resp.raise_for_status() # print(resp.status_code) return resp.status_code def refresh(self): self.get_endpoints() self.get_stacks(self) self.get_containers(self) return True def get_stacks(self, endpoint_id="all", timeout=10): if endpoint_id != "all": endpoint_id = self.get_endpoint_id(endpoint_id) path = "/stacks" stcks = [] stacks = self._api_get(path, timeout=timeout) self.stacks_all = {} fail_endponts = [20, 39, 41] # print(json.dumps(stacks,indent=2)) webhooks = {} for s in stacks: # print(type(s["AutoUpdate"]) ) # input(s) if s["EndpointId"] in fail_endponts: continue if not s["EndpointId"] in webhooks: try: webhooks[s["EndpointId"]] = {"webhook": {}} webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}} except Exception as e: logger.debug( f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}" ) if not s["EndpointId"] in self.stacks_all: self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}} self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = { "by_id": {}, "by_name": {}, } self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"] self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][ s["Id"] ] = s["Name"] self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"] self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][ s["Name"] ] = s["Id"] # print(s) if "AutoUpdate" in s and s["AutoUpdate"] is not None: if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]: # print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook']) # print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW") webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"] webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ "AutoUpdate" ]["Webhook"] elif s["AutoUpdate"]["Webhook"] != "": webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"] webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ "Webhook" ] # print(self.stacks_all) if s["EndpointId"] == endpoint_id or endpoint_id == "all": stcks.append(s) # print(stcks) if stcks is None: return [] self.stacks = stacks self.all_data["stacks"] = self.stacks_all self.all_data["webhooks"] = webhooks # input(json.dumps(self.stacks_all,indent=2)) return stcks def update_status(self, endpoint, stack): path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true" # input(path) stats = self._api_get(path) print(stats) def get_endpoint_id(self, endpoint): if self._is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] return endpoint else: self.endpoint_name = endpoint self.endpoint_id = self.endpoints["by_name"][endpoint] return self.endpoints["by_name"][endpoint] def get_endpoint_name(self, endpoint): if self._is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] return self.endpoints["by_id"][endpoint] else: self.endpoint_name = endpoint self.endpoint_id = self.endpoints["by_name"][endpoint] return endpoint def get_containers(self, endpoint="all", stack="all", timeout=30): # print(json.dumps(self.all_data,indent=2)) # print(endpoint) # print(stack) cont = [] data = {} if endpoint == "all": for s in self.all_data["endpoints"]["by_id"]: # print(s) if stack == "all": if s not in self.all_data["stacks"]: continue if self.all_data["endpoints_status"][s] != 1: # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") continue for e in self.all_data["stacks"][s]["by_name"]: path = ( f"/endpoints/{s}/docker/containers/json" f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}' ) logging.info(f"request : {path}") try: containers = self._api_get(path) except Exception as e: print(f"failed to get containers from {path}: {e}") continue contr = [] try: for c in containers: cont.append(c["Names"][0].replace("/", "")) contr.append(c["Names"][0].replace("/", "")) if self.all_data["endpoints"]["by_id"][s] in data: data[self.all_data["endpoints"]["by_id"][s]][e] = contr else: data[self.all_data["endpoints"]["by_id"][s]] = { e: contr } except Exception as e: logger.debug( f"Exception while getting containers for stack {e} ", f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}", ) # print(data) self.all_data["containers"] = data else: self.get_containers() for i in self.all_data["containers"][endpoint][stack]: cont.append(i) return cont def stop_containers(self, endpoint, containers, timeout=130): if self.all_data["endpoints_status"][endpoint] != 1: print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline") ep_id = self.endpoints["by_name"][endpoint] def stop(c): print(f" > Stopping {c}") self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop") # print(f"✔") with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) # for c in containers: # print(f" > Stopping {c}") # self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop") # return 0 def start_containers(self, endpoint, containers, timeout=130): ep_id = self.endpoints["by_name"][endpoint] def stop(c): print(f" > Starting {c}") self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start") with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) def update_stack(self, endpoint, stack, autostart, timeout=130): stcs = [] if stack == "all": for s in self.all_data["webhooks"][endpoint]: stcs.append([s, self.all_data["webhooks"][endpoint][s]]) else: stcs.append([stack, self.all_data["webhooks"][endpoint][stack]]) # input(stcs) def update(c): print(f" > Updating {c[0]} on {endpoint}") ans = self._api_post_no_body(f"/stacks/webhooks/{c[1]}") logger.debug( f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}" ) def stop(): cont = [] for c in self.all_data["containers"][endpoint]: if stack == c or stack == "all": cont += self.all_data["containers"][endpoint][c] self.stop_containers(endpoint, cont) with ThreadPoolExecutor(max_workers=10) as exe: exe.map(update, stcs) if not autostart: time.sleep(120) cont = [] for c in self.all_data["containers"][endpoint]: if stack == c or stack == "all": cont += self.all_data["containers"][endpoint][c] self.stop_containers(endpoint, cont) def get_endpoints(self, timeout=10): endpoints = self._api_get("/endpoints") eps = {"by_id": {}, "by_name": {}} eps_stats = {} for ep in endpoints: eps["by_id"][ep["Id"]] = ep["Name"] eps["by_name"][ep["Name"]] = ep["Id"] eps_stats[ep["Id"]] = ep["Status"] eps_stats[ep["Name"]] = ep["Status"] self.endpoints = eps self.all_data["endpoints"] = eps self.all_data["endpoints_status"] = eps_stats # input(eps_stats) # input(eps) return eps def get_endpoint(self, endpoint_id=None, timeout=30): self.get_endpoints() # print(self.endpoints) if self._is_number(endpoint_id): self.endpoint_name = self.endpoints["by_id"][endpoint_id] self.endpoint_id = endpoint_id else: self.endpoint_name = endpoint_id self.endpoint_id = self.endpoints["by_name"][endpoint_id] return self.endpoint_id def get_swarm_id(self, endpoint): ep_id = self.endpoints["by_name"][endpoint] path = f"/endpoints/{ep_id}/docker/info" stats = self._api_get(path) return stats["Swarm"]["Cluster"]["ID"] def get_stack(self, stack=None, endpoint_id=None, timeout=None): self.get_stacks(endpoint_id) if not self._is_number(endpoint_id): endpoint_id = int(self.endpoints["by_name"][endpoint_id]) self.stack_id = [] if stack == "all": for s in self.stacks: # print(s) if endpoint_id == s.get("EndpointId"): self.stack_ids.append(s.get("Id")) return self.stack_ids else: for s in self.stacks: # print(s) match_by_id = ( stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId") ) match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int( s.get("EndpointId") ) # Ensure types match for comparison if match_by_id or match_by_name: # if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) # or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")): self.stack_id = s.get("Id") self.stack_name = s.get("Name") self.stack_ids.append(s.get("Id")) return s print(ValueError(f"Stack not found: {stack}")) sys.exit(1) def create_stack( self, endpoint, stack=None, mode="git", autostart=False, swarm=False, ): if swarm: swarm_id = self.get_swarm_id(endpoint) p = "swarm" env_path = f"{self.repo_dir}/__swarm/{stack}/.env" else: p = "standalone" env_path = f"{self.repo_dir}/{stack}/.env" # input(swarm_id) self.endpoint_id = self.get_endpoint_id(endpoint) if os.path.exists(self.repo_dir): shutil.rmtree(self.repo_dir) else: print(f"Folder '{self.repo_dir}' does not exist.") Repo.clone_from(self.git_url, self.repo_dir) if mode == "git": path = f"/stacks/create/{p}/repository" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" if stack == "all": if self.endpoint_name == "rack": stacks = self.rack_stacks elif self.endpoint_name == "m-server": stacks = self.m_server_stacks elif self.endpoint_name == "rpi5": stacks = self.rpi5_stacks elif self.endpoint_name == "nas": stacks = self.nas_stacks else: stacks = [stack] # print(json.dumps(self.stacks_all, indent=2)) # input(json.dumps(self.stacks_all, indent=2)) for stack in stacks: if self.endpoint_id in self.stacks_all: # Check if the stack exists by ID or name stack_check = ( stack in self.stacks_all[self.endpoint_id]["by_id"] or stack in self.stacks_all[self.endpoint_id]["by_name"] ) if stack_check: print(f"Stack {stack} already exist") continue print(f"Working on {stack}") envs = [] if os.path.exists(f"{env_path}"): f = open(f"{env_path}", "r") env_vars = f.read().splitlines() for ev in env_vars: if ev.startswith("#") or ev.strip() == "": continue if "=" in ev: name, value = ev.split("=", 1) envs.append({"name": name, "value": value}) f.close() # wl(envs) for e in envs: # print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] if e["name"] == "RESTART" and self.endpoint_name == "m-server": e["value"] = "always" if e["name"] in HWS: # print("Found HW_MODE env var.") if self.hw_mode: e["value"] = "hw" else: e["value"] = "cpu" if e["name"] == "LOGGING": # print("Found LOGGING env var.") if self.log_mode: e["value"] = "journald" else: e["value"] = "syslog" uid = uuid.uuid4() # print(uid) req = { "Name": stack, "Env": envs, "AdditionalFiles": [], "AutoUpdate": { "forcePullImage": True, "forceUpdate": False, "webhook": f"{uid}", }, "repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git", "ReferenceName": "refs/heads/main", "composeFile": f"{stack}/docker-compose.yml", "ConfigFilePath": f"{stack}/docker-compose.yml", "repositoryAuthentication": True, "repositoryUsername": "jaydee", "repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS", "AuthorizationType": 0, "TLSSkipVerify": False, "supportRelativePath": True, "repositoryAuthentication": True, "fromAppTemplate": False, "registries": [6, 3], "FromAppTemplate": False, "Namespace": "", "CreatedByUserId": "", "Webhook": "", "filesystemPath": "/share/docker_data/portainer/portainer-data/", "RegistryID": 4, "isDetachedFromGit": True, "method": "repository", "swarmID": None, } if swarm: req["type"] = "swarm" req["swarmID"] = swarm_id req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml" req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml" print(json.dumps(req)) res = self._api_post(path, req) if "Id" in res: # print("Deploy request OK") pass else: print(res) tries = 0 created = False while True: try: # print(self.endpoint_id) # print(stack) self.get_stack(stack, self.endpoint_id) created = True break except Exception as e: print( f"Waiting for stack {stack} to be created...{tries}/50", end="\r", ) time.sleep(10) tries += 1 if tries > 50: print( f"Error retrieving stack {stack} after creation: {self.endpoint_name}" ) break logger.debug(f"Exception while getting stack {stack}: {e}") if created: if stack != "pihole": # print(autostart) if not autostart: # self.get_stacks() # self.stop_stack(stack,self.endpoint_id) conts = self.get_containers(self.endpoint_name, stack) # print(conts) self.stop_containers(self.endpoint_name, conts) if mode == "file": print("Creating new stack from file...") path = "/stacks/create/standalone/file" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" if stack == "all": if self.endpoint_name == "rack": stacks = self.rack_stacks elif self.endpoint_name == "m-server": stacks = self.m_server_stacks elif self.endpoint_name == "rpi5": stacks = self.rpi5_stacks else: stacks = [stack] for stack in stacks: print(f"Working on {stack}") if os.path.exists(f"{self.repo_dir}/{stack}/.env"): f = open(f"{self.repo_dir}/{stack}/.env", "r") env_vars = f.read().splitlines() envs = [] for ev in env_vars: if ev.startswith("#") or ev.strip() == "": continue if "=" in ev: name, value = ev.split("=", 1) envs.append({"name": name, "value": value}) f.close() # wl(envs) for e in envs: # print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] if e["name"] == "RESTART" and self.endpoint_name == "m-server": e["value"] = "always" if e["name"] in HWS: print("Found HW_MODE env var.") if self.hw_mode: e["value"] = "hw" else: e["value"] = "cpu" if e["name"] == "LOGGING": print("Found LOGGING env var.") if self.log_mode: e["value"] = "journald" else: e["value"] = "syslog" file = { # ("filename", file_object) "file": ( "docker-compose.yml", open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"), ), } self._api_post_file(path, self.endpoint_id, stack, envs, file) def print_stacks(self, endpoint="all"): '''Print a table of stacks, optionally filtered by endpoint.''' stacks = self.get_stacks() count = 0 data = [] for stack in stacks: if endpoint is not None: if not stack["EndpointId"] in self.endpoints["by_id"]: continue if endpoint != "all": if self.endpoints["by_name"][endpoint] != stack["EndpointId"]: continue try: data.append( [ stack["Id"], stack["Name"], self.endpoints["by_id"][stack["EndpointId"]], ] ) except KeyError as e: data.append([stack["Id"], stack["Name"], "?"]) logger.debug( "KeyError getting endpoint name for stack %s : %s", stack['Name'],e ) count += 1 headers = ["StackID", "Name", "Endpoint"] print(tabulate(data, headers=headers, tablefmt="github")) print(f"Total stacks: {count}") def start_stack(self, stack=None, endpoint_id=None): '''Start one stack or all stacks on an endpoint.''' if endpoint_id is not None: print("Getting endpoint") self.get_endpoint(endpoint_id) if stack is not None: self.get_stack(stack, endpoint_id) for stck in self.stack_ids: path = f"/stacks/{stck}/start" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" try: resp = self._api_post_no_body(path, timeout=20) except ValueError as e: print(f"Error stoping stack: {e}") return [] if "Id" in json.loads(resp): print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started" ) else: print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" ) return True def stop_stack(self, stack, endpoint_id): '''Stop one stack or all stacks on an endpoint.''' print(f"Stopping stack {stack}") if endpoint_id is not None: self.get_endpoint(endpoint_id) if stack == "all": self.get_stack(stack, endpoint_id) else: if stack is not None: self.stack_ids = [self.get_stack(stack, endpoint_id)["Id"]] for stck in self.stack_ids: path = f"/stacks/{stack}/stop" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" try: resp = self._api_post_no_body(path, timeout=120) except NameError as e: print(f"Error stopping stack: {e}") return [] if "Id" in json.loads(resp): print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped" ) else: print( f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" ) return True def _resolve_endpoint(self, endpoint_id): self.get_endpoints() if self._is_number(endpoint_id): self.endpoint_id = int(endpoint_id) self.endpoint_name = self.endpoints["by_id"][self.endpoint_id] else: self.endpoint_name = endpoint_id self.endpoint_id = int(self.endpoints["by_name"][endpoint_id]) def _resolve_stack_id(self, stack, endpoint_id): if stack == "all": return "all" if not self._is_number(stack): result = self.get_stack(stack, endpoint_id) return result["Id"] return int(stack) def _delete_all_stacks(self, endpoint_id): stacks = self.get_stacks(endpoint_id) paths = [] for s in stacks: if int(s["EndpointId"]) != int(endpoint_id): continue path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true" paths.append([ self.get_endpoint_name(endpoint_id), s["Name"], path ]) def delete_item(item): print(f"Delete stack {item[1]} from {item[0]}") out = self._api_delete(item[2]) logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out) with ThreadPoolExecutor(max_workers=10) as exe: exe.map(delete_item, paths) return "Done" def _delete_single_stack(self, stack_id, endpoint_id): path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true" try: out = self._api_delete(path) except ValueError as e: msg = str(e) if "Conflict for url" in msg: print("Stack with this name may already exist.") else: print(f"Error deleting stack: {e}") return [] return out or [] def delete_stack(self, endpoint_id=None, stack=None): """Delete one stack or all stacks on an endpoint.""" self._resolve_endpoint(endpoint_id) endpoint_id = self.endpoint_id stack_id = self._resolve_stack_id(stack, endpoint_id) if stack == "all": return self._delete_all_stacks(endpoint_id) return self._delete_single_stack(stack_id, endpoint_id) # def delete_stack(self, endpoint_id=None, stack=None): # """ # Return a list of stacks. If endpoint_id is provided, it will be added as a query param. # """ # self.get_endpoints() # if self._is_number(endpoint_id): # self.endpoint_name = self.endpoints["by_id"][endpoint_id] # self.endpoint_id = endpoint_id # else: # self.endpoint_name = endpoint_id # self.endpoint_id = self.endpoints["by_name"][endpoint_id] # if not self._is_number(endpoint_id): # endpoint_id = int(self.endpoints["by_name"][endpoint_id]) # if not self._is_number(stack) and stack != "all": # # print(stack) # # print(self.endpoint_id) # stack = self.get_stack(stack, self.endpoint_id)["Id"] # if stack == "all": # stacks = self.get_stacks(self.endpoint_id) # paths = [] # for s in stacks: # # print(f"Delete stack {s['Name']}") # # print(s['EndpointId'], endpoint_id) # if int(s["EndpointId"]) != int(endpoint_id): # continue # # print("Deleting stack:", s['Name']) # path = f"/stacks/{s['Id']}" # if endpoint_id is not None: # path += f"?endpointId={endpoint_id}&removeVolumes=true" # paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) # # input(paths) # def delete(c): # print(f"Delete stack {c[1]} from {c[0]} ") # out = self._api_delete(c[2]) # logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}") # with ThreadPoolExecutor(max_workers=10) as exe: # exe.map(delete, paths) # return "Done" # else: # path = f"/stacks/{stack}" # if endpoint_id is not None: # path += f"?endpointId={endpoint_id}&removeVolumes=true" # # print(path) # try: # # print(path) # # print(base_url) # # print(token) # stacks = self._api_delete(path) # except Exception as e: # # print(f"Error creating stack: {e}") # if "Conflict for url" in str(e): # print("Stack with this name may already exist.") # else: # print(f"Error deleting stack: {e}") # # print(stacks) # return [] # if stacks is None: # return [] # return stacks def create_secret(self, name, value, endpoint_id=None, timeout=None): '''Create a Docker secret on the specified endpoint.''' endpoint_id = int(self.endpoints["by_name"][endpoint_id]) path = f"/endpoints/{endpoint_id}/docker/secrets/create" encoded = base64.b64encode(value.encode()).decode() data = {"Name": name, "Data": encoded} return self._api_post(path, data, timeout=timeout)