Files
portainer/port.py
2025-12-02 21:29:55 +01:00

824 lines
32 KiB
Python

import os
import requests
import json
import uuid
import shutil
import time
import logging
import base64
import tabulate
from git import Repo
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
class Portainer:
"""
Simple wrapper around the module-level Portainer helper functions.
Instantiate with base_url and optional token/timeout and call methods
to perform API operations.
"""
def __init__(self, base_url, token, timeout=10):
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout = timeout
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.stack_name = None
self.stacks_all = {}
self.stack_id = None
self.stack_ids = []
self.endpoint_name = None
self.endpoint_id = None
# self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git"
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.repo_dir = "/tmp/docker-compose"
self.basic_stacks = [
"pihole",
"nginx",
"mosquitto",
"webhub",
"authentik",
"bitwarden",
"mailu3",
"home-assistant",
"homepage",
]
self.nas_stacks = self.basic_stacks + [
"gitlab",
"bookstack",
"dockermon",
"gitea",
"grafana",
"immich",
"jupyter",
"kestra",
"mealie",
]
self.m_server_stacks = self.basic_stacks + [
"immich",
"zabbix-server",
"gitea",
"unifibrowser",
"mediacenter",
"watchtower",
"wazuh",
"octoprint",
"motioneye",
"kestra",
"bookstack",
"wud",
"uptime-kuma",
"registry",
"regsync",
"dockermon",
"grafana",
"nextcloud",
"semaphore",
"node-red",
"test",
"jupyter",
"paperless",
"mealie",
"n8n",
"ollama",
"rancher",
]
self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"]
self.rack_stacks = self.basic_stacks + [
"gitlab",
"bookstack",
"dockermon",
"gitea",
"grafana",
"immich",
"jupyter",
"kestra",
"mealie",
]
self.log_mode = False
self.hw_mode = False
self.all_data = {"containers": {}, "stacks": {}, "endpoints": {}}
self.get_endpoints()
self.get_stacks()
self.get_containers()
def is_number(self, s):
"""Check if the input string is a number."""
try:
float(s)
return True
except ValueError:
return False
def api_get(self, path, timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
def api_post(self, path, json="", timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
# print(url)
# print(json)
resp = requests.post(url, headers=headers, json=json, timeout=timeout)
return resp.text
def api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
# input("API POST2 called. Press Enter to continue.")
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)}
# print(data)
resp = requests.post(
url, headers=headers, files=file, data=data, timeout=timeout
)
resp.raise_for_status()
return resp.json()
def api_post_no_body(self, path, timeout=120):
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
# print(url)
headers = {"X-API-Key": f"{self.token}"}
resp = requests.post(url, headers=headers, timeout=timeout)
return resp.text
def api_delete(self, path, timeout=120):
"""Example authenticated DELETE request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.delete(url, headers=headers, timeout=timeout)
# print(resp)
resp.raise_for_status()
# print(resp.status_code)
return resp.status_code
def refresh(self):
self.get_endpoints()
self.get_stacks(self)
self.get_containers(self)
return True
def get_stacks(self, endpoint_id="all", timeout=10):
if endpoint_id != "all":
endpoint_id = self.get_endpoint_id(endpoint_id)
path = "/stacks"
stcks = []
stacks = self.api_get(path, timeout=timeout)
self.stacks_all = {}
fail_endponts = [20, 39, 41]
# print(json.dumps(stacks,indent=2))
webhooks = {}
for s in stacks:
# print(type(s["AutoUpdate"]) )
# input(s)
if s["EndpointId"] in fail_endponts:
continue
if not s["EndpointId"] in webhooks:
try:
webhooks[s["EndpointId"]] = {"webhook": {}}
webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}}
except Exception as e:
logger.debug(
f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}"
)
if not s["EndpointId"] in self.stacks_all:
self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}}
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = {
"by_id": {},
"by_name": {},
}
self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"]
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][
s["Id"]
] = s["Name"]
self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"]
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][
s["Name"]
] = s["Id"]
# print(s)
if "AutoUpdate" in s and s["AutoUpdate"] is not None:
if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]:
# print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
# print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"]
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"AutoUpdate"
]["Webhook"]
elif s["AutoUpdate"]["Webhook"] != "":
webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"]
webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"Webhook"
]
# print(self.stacks_all)
if s["EndpointId"] == endpoint_id or endpoint_id == "all":
stcks.append(s)
# print(stcks)
if stcks is None:
return []
self.stacks = stacks
self.all_data["stacks"] = self.stacks_all
self.all_data["webhooks"] = webhooks
# input(json.dumps(self.stacks_all,indent=2))
return stcks
def get_stack_id(self, endpoint, stack):
pass
def update_status(self, endpoint, stack):
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
# input(path)
stats = self.api_get(path)
print(stats)
def get_endpoint_id(self, endpoint):
if self.is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return endpoint
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return self.endpoints["by_name"][endpoint]
def get_endpoint_name(self, endpoint):
if self.is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return self.endpoints["by_id"][endpoint]
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return endpoint
def get_containers(self, endpoint="all", stack="all", timeout=30):
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
if endpoint == "all":
for s in self.all_data["endpoints"]["by_id"]:
# print(s)
if stack == "all":
if s not in self.all_data["stacks"]:
continue
if self.all_data["endpoints_status"][s] != 1:
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
for e in self.all_data["stacks"][s]["by_name"]:
path = (
f"/endpoints/{s}/docker/containers/json"
f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
)
logging.info(f"request : {path}")
try:
containers = self.api_get(path)
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
cont.append(c["Names"][0].replace("/", ""))
contr.append(c["Names"][0].replace("/", ""))
if self.all_data["endpoints"]["by_id"][s] in data:
data[self.all_data["endpoints"]["by_id"][s]][e] = contr
else:
data[self.all_data["endpoints"]["by_id"][s]] = {
e: contr
}
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}",
)
# print(data)
self.all_data["containers"] = data
else:
self.get_containers()
for i in self.all_data["containers"][endpoint][stack]:
cont.append(i)
return cont
def stop_containers(self, endpoint, containers, timeout=130):
if self.all_data["endpoints_status"][endpoint] != 1:
print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline")
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Stopping {c}")
self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop")
# print(f"✔")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
# for c in containers:
# print(f" > Stopping {c}")
# self.api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
# return 0
def start_containers(self, endpoint, containers, timeout=130):
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Starting {c}")
self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
def update_stack(self, endpoint, stack, autostart, timeout=130):
stcs = []
if stack == "all":
for s in self.all_data["webhooks"][endpoint]:
stcs.append([s, self.all_data["webhooks"][endpoint][s]])
else:
stcs.append([stack, self.all_data["webhooks"][endpoint][stack]])
# input(stcs)
def update(c):
print(f" > Updating {c[0]} on {endpoint}")
ans = self.api_post_no_body(f"/stacks/webhooks/{c[1]}")
logger.debug(
f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}"
)
def stop():
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack == "all":
cont += self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint, cont)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(update, stcs)
if not autostart:
time.sleep(120)
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack == "all":
cont += self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint, cont)
def get_endpoints(self, timeout=10):
endpoints = self.api_get("/endpoints")
eps = {"by_id": {}, "by_name": {}}
eps_stats = {}
for ep in endpoints:
eps["by_id"][ep["Id"]] = ep["Name"]
eps["by_name"][ep["Name"]] = ep["Id"]
eps_stats[ep["Id"]] = ep["Status"]
eps_stats[ep["Name"]] = ep["Status"]
self.endpoints = eps
self.all_data["endpoints"] = eps
self.all_data["endpoints_status"] = eps_stats
# input(eps_stats)
# input(eps)
return eps
def get_endpoint(self, endpoint_id=None, timeout=30):
self.get_endpoints()
# print(self.endpoints)
if self.is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
return self.endpoint_id
def get_swarm_id(self, endpoint):
ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info"
stats = self.api_get(path)
return stats["Swarm"]["Cluster"]["ID"]
def get_stack(self, stack=None, endpoint_id=None, timeout=None):
self.get_stacks(endpoint_id)
if not self.is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
self.stack_id = []
if stack == "all":
for s in self.stacks:
# print(s)
if endpoint_id == s.get("EndpointId"):
self.stack_ids.append(s.get("Id"))
return self.stack_ids
else:
for s in self.stacks:
# print(s)
match_by_id = (
stack is not None
and s.get("Id") == stack
and endpoint_id == s.get("EndpointId")
)
match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int(
s.get("EndpointId")
) # Ensure types match for comparison
if match_by_id or match_by_name:
# if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId"))
# or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")):
self.stack_id = s.get("Id")
self.stack_name = s.get("Name")
self.stack_ids.append(s.get("Id"))
return s
raise ValueError(f"Stack not found: {stack}")
def create_stack(
self,
endpoint,
stack=None,
mode="git",
autostart=False,
swarm=False,
timeout=None,
):
if swarm:
swarm_id = self.get_swarm_id(endpoint)
p = "swarm"
env_path = f"{self.repo_dir}/__swarm/{stack}/.env"
else:
p = "standalone"
env_path = f"{self.repo_dir}/{stack}/.env"
# input(swarm_id)
self.endpoint_id = self.get_endpoint_id(endpoint)
if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir)
else:
print(f"Folder '{self.repo_dir}' does not exist.")
Repo.clone_from(self.git_url, self.repo_dir)
if mode == "git":
path = f"/stacks/create/{p}/repository"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
elif self.endpoint_name == "nas":
stacks = self.nas_stacks
else:
stacks = [stack]
# print(json.dumps(self.stacks_all, indent=2))
# input(json.dumps(self.stacks_all, indent=2))
for stack in stacks:
if self.endpoint_id in self.stacks_all:
# Check if the stack exists by ID or name
stack_check = (
stack in self.stacks_all[self.endpoint_id]["by_id"]
or stack in self.stacks_all[self.endpoint_id]["by_name"]
)
if stack_check:
print(f"Stack {stack} already exist")
continue
print(f"Working on {stack}")
envs = []
if os.path.exists(f"{env_path}"):
f = open(f"{env_path}", "r")
env_vars = f.read().splitlines()
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
# wl(envs)
for e in envs:
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e["value"] = "always"
if e["name"] in HWS:
# print("Found HW_MODE env var.")
if self.hw_mode:
e["value"] = "hw"
else:
e["value"] = "cpu"
if e["name"] == "LOGGING":
# print("Found LOGGING env var.")
if self.log_mode:
e["value"] = "journald"
else:
e["value"] = "syslog"
uid = uuid.uuid4()
# print(uid)
req = {
"Name": stack,
"Env": envs,
"AdditionalFiles": [],
"AutoUpdate": {
"forcePullImage": True,
"forceUpdate": False,
"webhook": f"{uid}",
},
"repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git",
"ReferenceName": "refs/heads/main",
"composeFile": f"{stack}/docker-compose.yml",
"ConfigFilePath": f"{stack}/docker-compose.yml",
"repositoryAuthentication": True,
"repositoryUsername": "jaydee",
"repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS",
"AuthorizationType": 0,
"TLSSkipVerify": False,
"supportRelativePath": True,
"repositoryAuthentication": True,
"fromAppTemplate": False,
"registries": [6, 3],
"FromAppTemplate": False,
"Namespace": "",
"CreatedByUserId": "",
"Webhook": "",
"filesystemPath": "/share/docker_data/portainer/portainer-data/",
"RegistryID": 4,
"isDetachedFromGit": True,
"method": "repository",
"swarmID": None,
}
if swarm:
req["type"] = "swarm"
req["swarmID"] = swarm_id
req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml"
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
print(json.dumps(req))
res = self.api_post(path, req)
if "Id" in res:
# print("Deploy request OK")
pass
else:
print(res)
tries = 0
created = False
while True:
try:
# print(self.endpoint_id)
# print(stack)
self.get_stack(stack, self.endpoint_id)
created = True
break
except Exception as e:
print(
f"Waiting for stack {stack} to be created...{tries}/50",
end="\r",
)
time.sleep(10)
tries += 1
if tries > 50:
print(
f"Error retrieving stack {stack} after creation: {self.endpoint_name}"
)
break
logger.debug(f"Exception while getting stack {stack}: {e}")
if created:
if stack != "pihole":
# print(autostart)
if not autostart:
# self.get_stacks()
# self.stop_stack(stack,self.endpoint_id)
conts = self.get_containers(self.endpoint_name, stack)
# print(conts)
self.stop_containers(self.endpoint_name, conts)
if mode == "file":
print("Creating new stack from file...")
path = "/stacks/create/standalone/file"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
else:
stacks = [stack]
for stack in stacks:
print(f"Working on {stack}")
if os.path.exists(f"{self.repo_dir}/{stack}/.env"):
f = open(f"{self.repo_dir}/{stack}/.env", "r")
env_vars = f.read().splitlines()
envs = []
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
# wl(envs)
for e in envs:
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e["value"] = "always"
if e["name"] in HWS:
print("Found HW_MODE env var.")
if self.hw_mode:
e["value"] = "hw"
else:
e["value"] = "cpu"
if e["name"] == "LOGGING":
print("Found LOGGING env var.")
if self.log_mode:
e["value"] = "journald"
else:
e["value"] = "syslog"
file = {
# ("filename", file_object)
"file": (
"docker-compose.yml",
open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"),
),
}
self.api_post_file(path, self.endpoint_id, stack, envs, file)
def print_stacks(self, endpoint="all"):
stacks = self.get_stacks()
count = 0
data = []
for stack in stacks:
if endpoint is not None:
if not stack["EndpointId"] in self.endpoints["by_id"]:
continue
if endpoint != "all":
if self.endpoints["by_name"][endpoint] != stack["EndpointId"]:
continue
try:
data.append(
[
stack["Id"],
stack["Name"],
self.endpoints["by_id"][stack["EndpointId"]],
]
)
except KeyError as e:
data.append([stack["Id"], stack["Name"], "?"])
logger.debug(
f"KeyError getting endpoint name for stack {stack['Name']}: {e}"
)
count += 1
headers = ["StackID", "Name", "Endpoint"]
print(tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}")
def start_stack(self, stack=None, endpoint_id=None):
if endpoint_id is not None:
print("Getting endpoint")
self.get_endpoint(endpoint_id)
if stack is not None:
self.get_stack(stack, endpoint_id)
for stack in self.stack_ids:
path = f"/stacks/{stack}/start"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self.api_post_no_body(path, timeout=20)
except Exception as e:
print(f"Error stoping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}"
)
return True
def stop_stack(self, stack, endpoint_id):
print(f"Stopping stack {stack}")
if endpoint_id is not None:
self.get_endpoint(endpoint_id)
if stack == "all":
self.get_stack(stack, endpoint_id)
else:
if stack is not None:
self.stack_ids = [self.get_stack(stack, endpoint_id)["Id"]]
for stack in self.stack_ids:
path = f"/stacks/{stack}/stop"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self.api_post_no_body(path, timeout=120)
except NameError as e:
print(f"Error stopping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}"
)
return True
def delete_stack(self, endpoint_id=None, stack=None, timeout=None):
"""
Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
"""
self.get_endpoints()
if self.is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
if not self.is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
if not self.is_number(stack) and stack != "all":
# print(stack)
# print(self.endpoint_id)
stack = self.get_stack(stack, self.endpoint_id)["Id"]
if stack == "all":
stacks = self.get_stacks(self.endpoint_id)
paths = []
for s in stacks:
# print(f"Delete stack {s['Name']}")
# print(s['EndpointId'], endpoint_id)
if int(s["EndpointId"]) != int(endpoint_id):
continue
# print("Deleting stack:", s['Name'])
path = f"/stacks/{s['Id']}"
if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true"
paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
# input(paths)
def delete(c):
print(f"Delete stack {c[1]} from {c[0]} ")
out = self.api_delete(c[2])
logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(delete, paths)
return "Done"
else:
path = f"/stacks/{stack}"
if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true"
# print(path)
try:
# print(path)
# print(base_url)
# print(token)
stacks = self.api_delete(path)
except Exception as e:
# print(f"Error creating stack: {e}")
if "Conflict for url" in str(e):
print("Stack with this name may already exist.")
else:
print(f"Error deleting stack: {e}")
# print(stacks)
return []
if stacks is None:
return []
return stacks
def refresh_status(self, stack, timeout=None):
pass
def __repr__(self):
pass
def create_secret(self, name, value, endpoint_id=None, timeout=None):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
path = f"/endpoints/{endpoint_id}/docker/secrets/create"
encoded = base64.b64encode(value.encode()).decode()
data = {"Name": name, "Data": encoded}
return self.api_post(path, data, timeout=timeout)