This commit is contained in:
2025-12-02 21:10:52 +01:00
parent c7d4623c2f
commit 1a3e37ed8b
2 changed files with 275 additions and 190 deletions

278
port.py
View File

@@ -20,8 +20,9 @@ class Portainer:
Instantiate with base_url and optional token/timeout and call methods Instantiate with base_url and optional token/timeout and call methods
to perform API operations. to perform API operations.
""" """
def __init__(self, base_url, token, timeout=10): def __init__(self, base_url, token, timeout=10):
self.base_url = base_url.rstrip('/') self.base_url = base_url.rstrip("/")
self.token = token self.token = token
self.timeout = timeout self.timeout = timeout
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
@@ -43,7 +44,7 @@ class Portainer:
"bitwarden", "bitwarden",
"mailu3", "mailu3",
"home-assistant", "home-assistant",
"homepage" "homepage",
] ]
self.nas_stacks = self.basic_stacks + [ self.nas_stacks = self.basic_stacks + [
"gitlab", "gitlab",
@@ -54,42 +55,38 @@ class Portainer:
"immich", "immich",
"jupyter", "jupyter",
"kestra", "kestra",
"mealie" "mealie",
] ]
self.m_server_stacks = self.basic_stacks + [ self.m_server_stacks = self.basic_stacks + [
'immich', "immich",
'zabbix-server', "zabbix-server",
'gitea', "gitea",
'unifibrowser', "unifibrowser",
'mediacenter', "mediacenter",
'watchtower', "watchtower",
'wazuh', "wazuh",
'octoprint', "octoprint",
'motioneye', "motioneye",
'kestra', "kestra",
'bookstack',
'wud',
'uptime-kuma',
'registry',
'regsync',
'dockermon',
'grafana',
'nextcloud',
'semaphore',
'node-red',
'test',
'jupyter',
'paperless',
'mealie',
'n8n',
'ollama',
'rancher'
]
self.rpi5_stacks = self.basic_stacks + [
"gitlab",
"bookstack", "bookstack",
"gitea" "wud",
"uptime-kuma",
"registry",
"regsync",
"dockermon",
"grafana",
"nextcloud",
"semaphore",
"node-red",
"test",
"jupyter",
"paperless",
"mealie",
"n8n",
"ollama",
"rancher",
] ]
self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"]
self.rack_stacks = self.basic_stacks + [ self.rack_stacks = self.basic_stacks + [
"gitlab", "gitlab",
"bookstack", "bookstack",
@@ -99,7 +96,7 @@ class Portainer:
"immich", "immich",
"jupyter", "jupyter",
"kestra", "kestra",
"mealie" "mealie",
] ]
self.log_mode = False self.log_mode = False
self.hw_mode = False self.hw_mode = False
@@ -136,13 +133,11 @@ class Portainer:
"""Example authenticated GET request to Portainer API.""" """Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}" url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"} headers = {"X-API-Key": f"{self.token}"}
data = { data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)}
"EndpointId": endpoint_id,
"Name": name,
"Env": json.dumps(envs)
}
# print(data) # print(data)
resp = requests.post(url, headers=headers, files=file, data=data, timeout=timeout) resp = requests.post(
url, headers=headers, files=file, data=data, timeout=timeout
)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
@@ -182,36 +177,49 @@ class Portainer:
for s in stacks: for s in stacks:
# print(type(s["AutoUpdate"]) ) # print(type(s["AutoUpdate"]) )
# input(s) # input(s)
if s['EndpointId'] in fail_endponts: if s["EndpointId"] in fail_endponts:
continue continue
if not s['EndpointId'] in webhooks: if not s["EndpointId"] in webhooks:
try: try:
webhooks[s['EndpointId']] = {"webhook": {}} webhooks[s["EndpointId"]] = {"webhook": {}}
webhooks[self.endpoints["by_id"][s['EndpointId']]] = {"webhook": {}} webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}}
except Exception as e: except Exception as e:
logger.debug(f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}") logger.debug(
if not s['EndpointId'] in self.stacks_all: f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}"
self.stacks_all[s['EndpointId']] = {"by_id": {}, "by_name": {}} )
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]] = {"by_id": {}, "by_name": {}} if not s["EndpointId"] in self.stacks_all:
self.stacks_all[s['EndpointId']]["by_id"][s['Id']] = s['Name'] self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}}
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_id"][s['Id']] = s['Name'] self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = {
"by_id": {},
"by_name": {},
}
self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"]
self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][
s["Id"]
] = s["Name"]
self.stacks_all[s['EndpointId']]["by_name"][s['Name']] = s['Id'] self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"]
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_name"][s['Name']] = s['Id'] self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][
s["Name"]
] = s["Id"]
# print(s) # print(s)
if "AutoUpdate" in s and s["AutoUpdate"] is not None: if "AutoUpdate" in s and s["AutoUpdate"] is not None:
if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]: if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]:
# print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook']) # print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
# print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW") # print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
webhooks[s['EndpointId']][s['Name']] = s['AutoUpdate']['Webhook'] webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"]
webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['AutoUpdate']['Webhook'] webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"AutoUpdate"
]["Webhook"]
elif s["AutoUpdate"]["Webhook"] != "": elif s["AutoUpdate"]["Webhook"] != "":
webhooks[s['EndpointId']][s['Name']] = s['Webhook'] webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"]
webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['Webhook'] webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[
"Webhook"
]
# print(self.stacks_all) # print(self.stacks_all)
if s['EndpointId'] == endpoint_id or endpoint_id == "all": if s["EndpointId"] == endpoint_id or endpoint_id == "all":
stcks.append(s) stcks.append(s)
# print(stcks) # print(stcks)
if stcks is None: if stcks is None:
@@ -269,7 +277,7 @@ class Portainer:
for e in self.all_data["stacks"][s]["by_name"]: for e in self.all_data["stacks"][s]["by_name"]:
path = ( path = (
f"/endpoints/{s}/docker/containers/json" f"/endpoints/{s}/docker/containers/json"
f"?all=1&filters={{\"label\": [\"com.docker.compose.project={e}\"]}}" f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
) )
logging.info(f"request : {path}") logging.info(f"request : {path}")
try: try:
@@ -285,11 +293,13 @@ class Portainer:
if self.all_data["endpoints"]["by_id"][s] in data: if self.all_data["endpoints"]["by_id"][s] in data:
data[self.all_data["endpoints"]["by_id"][s]][e] = contr data[self.all_data["endpoints"]["by_id"][s]][e] = contr
else: else:
data[self.all_data["endpoints"]["by_id"][s]] = {e: contr} data[self.all_data["endpoints"]["by_id"][s]] = {
e: contr
}
except Exception as e: except Exception as e:
logger.debug( logger.debug(
f"Exception while getting containers for stack {e} ", f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}" f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}",
) )
# print(data) # print(data)
self.all_data["containers"] = data self.all_data["containers"] = data
@@ -307,10 +317,9 @@ class Portainer:
def stop(c): def stop(c):
print(f" > Stopping {c}") print(f" > Stopping {c}")
self.api_post_no_body( self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop")
f"/endpoints/{ep_id}/docker/containers/{c}/stop"
)
# print(f"✔") # print(f"✔")
with ThreadPoolExecutor(max_workers=10) as exe: with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers) exe.map(stop, containers)
# for c in containers: # for c in containers:
@@ -323,9 +332,8 @@ class Portainer:
def stop(c): def stop(c):
print(f" > Starting {c}") print(f" > Starting {c}")
self.api_post_no_body( self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start")
f"/endpoints/{ep_id}/docker/containers/{c}/start"
)
with ThreadPoolExecutor(max_workers=10) as exe: with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers) exe.map(stop, containers)
@@ -340,10 +348,10 @@ class Portainer:
# input(stcs) # input(stcs)
def update(c): def update(c):
print(f" > Updating {c[0]} on {endpoint}") print(f" > Updating {c[0]} on {endpoint}")
ans = self.api_post_no_body( ans = self.api_post_no_body(f"/stacks/webhooks/{c[1]}")
f"/stacks/webhooks/{c[1]}" logger.debug(
f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}"
) )
logger.debug(f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}")
def stop(): def stop():
cont = [] cont = []
@@ -368,10 +376,10 @@ class Portainer:
eps = {"by_id": {}, "by_name": {}} eps = {"by_id": {}, "by_name": {}}
eps_stats = {} eps_stats = {}
for ep in endpoints: for ep in endpoints:
eps['by_id'][ep['Id']] = ep['Name'] eps["by_id"][ep["Id"]] = ep["Name"]
eps['by_name'][ep['Name']] = ep['Id'] eps["by_name"][ep["Name"]] = ep["Id"]
eps_stats[ep['Id']] = ep['Status'] eps_stats[ep["Id"]] = ep["Status"]
eps_stats[ep['Name']] = ep['Status'] eps_stats[ep["Name"]] = ep["Status"]
self.endpoints = eps self.endpoints = eps
self.all_data["endpoints"] = eps self.all_data["endpoints"] = eps
self.all_data["endpoints_status"] = eps_stats self.all_data["endpoints_status"] = eps_stats
@@ -394,7 +402,7 @@ class Portainer:
ep_id = self.endpoints["by_name"][endpoint] ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info" path = f"/endpoints/{ep_id}/docker/info"
stats = self.api_get(path) stats = self.api_get(path)
return stats['Swarm']['Cluster']['ID'] return stats["Swarm"]["Cluster"]["ID"]
def get_stack(self, stack=None, endpoint_id=None, timeout=None): def get_stack(self, stack=None, endpoint_id=None, timeout=None):
self.get_stacks(endpoint_id) self.get_stacks(endpoint_id)
@@ -404,7 +412,7 @@ class Portainer:
if stack == "all": if stack == "all":
for s in self.stacks: for s in self.stacks:
# print(s) # print(s)
if (endpoint_id == s.get("EndpointId")): if endpoint_id == s.get("EndpointId"):
self.stack_ids.append(s.get("Id")) self.stack_ids.append(s.get("Id"))
return self.stack_ids return self.stack_ids
else: else:
@@ -416,10 +424,9 @@ class Portainer:
and endpoint_id == s.get("EndpointId") and endpoint_id == s.get("EndpointId")
) )
match_by_name = ( match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int(
str(s.get("Name")) == str(stack) s.get("EndpointId")
and endpoint_id == int(s.get("EndpointId")) # Ensure types match for comparison ) # Ensure types match for comparison
)
if match_by_id or match_by_name: if match_by_id or match_by_name:
# if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) # if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId"))
@@ -431,7 +438,15 @@ class Portainer:
raise ValueError(f"Stack not found: {stack}") raise ValueError(f"Stack not found: {stack}")
def create_stack(self, endpoint, stack=None, mode="git", autostart=False, swarm=False, timeout=None): def create_stack(
self,
endpoint,
stack=None,
mode="git",
autostart=False,
swarm=False,
timeout=None,
):
if swarm: if swarm:
swarm_id = self.get_swarm_id(endpoint) swarm_id = self.get_swarm_id(endpoint)
p = "swarm" p = "swarm"
@@ -469,8 +484,8 @@ class Portainer:
# Check if the stack exists by ID or name # Check if the stack exists by ID or name
stack_check = ( stack_check = (
stack in self.stacks_all[self.endpoint_id]['by_id'] stack in self.stacks_all[self.endpoint_id]["by_id"]
or stack in self.stacks_all[self.endpoint_id]['by_name'] or stack in self.stacks_all[self.endpoint_id]["by_name"]
) )
if stack_check: if stack_check:
print(f"Stack {stack} already exist") print(f"Stack {stack} already exist")
@@ -488,24 +503,24 @@ class Portainer:
name, value = ev.split("=", 1) name, value = ev.split("=", 1)
envs.append({"name": name, "value": value}) envs.append({"name": name, "value": value})
f.close() f.close()
# wl(envs) # wl(envs)
for e in envs: for e in envs:
# print(f"Env: {e['name']} = {e['value']}") # print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e['name'] == "RESTART" and self.endpoint_name == "m-server": if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e['value'] = "always" e["value"] = "always"
if e['name'] in HWS: if e["name"] in HWS:
# print("Found HW_MODE env var.") # print("Found HW_MODE env var.")
if self.hw_mode: if self.hw_mode:
e['value'] = "hw" e["value"] = "hw"
else: else:
e['value'] = "cpu" e["value"] = "cpu"
if e['name'] == "LOGGING": if e["name"] == "LOGGING":
# print("Found LOGGING env var.") # print("Found LOGGING env var.")
if self.log_mode: if self.log_mode:
e['value'] = "journald" e["value"] = "journald"
else: else:
e['value'] = "syslog" e["value"] = "syslog"
uid = uuid.uuid4() uid = uuid.uuid4()
# print(uid) # print(uid)
@@ -516,7 +531,7 @@ class Portainer:
"AutoUpdate": { "AutoUpdate": {
"forcePullImage": True, "forcePullImage": True,
"forceUpdate": False, "forceUpdate": False,
"webhook": f"{uid}" "webhook": f"{uid}",
}, },
"repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git", "repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git",
"ReferenceName": "refs/heads/main", "ReferenceName": "refs/heads/main",
@@ -539,7 +554,7 @@ class Portainer:
"RegistryID": 4, "RegistryID": 4,
"isDetachedFromGit": True, "isDetachedFromGit": True,
"method": "repository", "method": "repository",
"swarmID": None "swarmID": None,
} }
if swarm: if swarm:
req["type"] = "swarm" req["type"] = "swarm"
@@ -564,11 +579,16 @@ class Portainer:
created = True created = True
break break
except Exception as e: except Exception as e:
print(f"Waiting for stack {stack} to be created...{tries}/50", end="\r") print(
f"Waiting for stack {stack} to be created...{tries}/50",
end="\r",
)
time.sleep(10) time.sleep(10)
tries += 1 tries += 1
if tries > 50: if tries > 50:
print(f"Error retrieving stack {stack} after creation: {self.endpoint_name}") print(
f"Error retrieving stack {stack} after creation: {self.endpoint_name}"
)
break break
logger.debug(f"Exception while getting stack {stack}: {e}") logger.debug(f"Exception while getting stack {stack}: {e}")
@@ -611,28 +631,31 @@ class Portainer:
name, value = ev.split("=", 1) name, value = ev.split("=", 1)
envs.append({"name": name, "value": value}) envs.append({"name": name, "value": value})
f.close() f.close()
# wl(envs) # wl(envs)
for e in envs: for e in envs:
# print(f"Env: {e['name']} = {e['value']}") # print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e['name'] == "RESTART" and self.endpoint_name == "m-server": if e["name"] == "RESTART" and self.endpoint_name == "m-server":
e['value'] = "always" e["value"] = "always"
if e['name'] in HWS: if e["name"] in HWS:
print("Found HW_MODE env var.") print("Found HW_MODE env var.")
if self.hw_mode: if self.hw_mode:
e['value'] = "hw" e["value"] = "hw"
else: else:
e['value'] = "cpu" e["value"] = "cpu"
if e['name'] == "LOGGING": if e["name"] == "LOGGING":
print("Found LOGGING env var.") print("Found LOGGING env var.")
if self.log_mode: if self.log_mode:
e['value'] = "journald" e["value"] = "journald"
else: else:
e['value'] = "syslog" e["value"] = "syslog"
file = { file = {
# ("filename", file_object) # ("filename", file_object)
"file": ("docker-compose.yml", open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb")), "file": (
"docker-compose.yml",
open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"),
),
} }
self.api_post_file(path, self.endpoint_id, stack, envs, file) self.api_post_file(path, self.endpoint_id, stack, envs, file)
@@ -642,16 +665,24 @@ class Portainer:
data = [] data = []
for stack in stacks: for stack in stacks:
if endpoint is not None: if endpoint is not None:
if not stack['EndpointId'] in self.endpoints['by_id']: if not stack["EndpointId"] in self.endpoints["by_id"]:
continue continue
if endpoint != "all": if endpoint != "all":
if self.endpoints['by_name'][endpoint] != stack['EndpointId']: if self.endpoints["by_name"][endpoint] != stack["EndpointId"]:
continue continue
try: try:
data.append([stack['Id'], stack['Name'], self.endpoints['by_id'][stack['EndpointId']]]) data.append(
[
stack["Id"],
stack["Name"],
self.endpoints["by_id"][stack["EndpointId"]],
]
)
except KeyError as e: except KeyError as e:
data.append([stack['Id'], stack['Name'], "?"]) data.append([stack["Id"], stack["Name"], "?"])
logger.debug(f"KeyError getting endpoint name for stack {stack['Name']}: {e}") logger.debug(
f"KeyError getting endpoint name for stack {stack['Name']}: {e}"
)
count += 1 count += 1
headers = ["StackID", "Name", "Endpoint"] headers = ["StackID", "Name", "Endpoint"]
@@ -674,9 +705,13 @@ class Portainer:
print(f"Error stoping stack: {e}") print(f"Error stoping stack: {e}")
return [] return []
if "Id" in json.loads(resp): if "Id" in json.loads(resp):
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started") print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started"
)
else: else:
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}") print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}"
)
return True return True
def stop_stack(self, stack, endpoint_id): def stop_stack(self, stack, endpoint_id):
@@ -698,9 +733,13 @@ class Portainer:
print(f"Error stopping stack: {e}") print(f"Error stopping stack: {e}")
return [] return []
if "Id" in json.loads(resp): if "Id" in json.loads(resp):
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped") print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped"
)
else: else:
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}") print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}"
)
return True return True
def delete_stack(self, endpoint_id=None, stack=None, timeout=None): def delete_stack(self, endpoint_id=None, stack=None, timeout=None):
@@ -728,13 +767,13 @@ class Portainer:
for s in stacks: for s in stacks:
# print(f"Delete stack {s['Name']}") # print(f"Delete stack {s['Name']}")
# print(s['EndpointId'], endpoint_id) # print(s['EndpointId'], endpoint_id)
if int(s['EndpointId']) != int(endpoint_id): if int(s["EndpointId"]) != int(endpoint_id):
continue continue
# print("Deleting stack:", s['Name']) # print("Deleting stack:", s['Name'])
path = f"/stacks/{s['Id']}" path = f"/stacks/{s['Id']}"
if endpoint_id is not None: if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true" path += f"?endpointId={endpoint_id}&removeVolumes=true"
paths.append([self.get_endpoint_name(endpoint_id), s['Name'], path]) paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
# input(paths) # input(paths)
def delete(c): def delete(c):
@@ -779,8 +818,5 @@ class Portainer:
endpoint_id = int(self.endpoints["by_name"][endpoint_id]) endpoint_id = int(self.endpoints["by_name"][endpoint_id])
path = f"/endpoints/{endpoint_id}/docker/secrets/create" path = f"/endpoints/{endpoint_id}/docker/secrets/create"
encoded = base64.b64encode(value.encode()).decode() encoded = base64.b64encode(value.encode()).decode()
data = { data = {"Name": name, "Data": encoded}
"Name": name,
"Data": encoded
}
self.api_post(path, data, timeout=timeout) self.api_post(path, data, timeout=timeout)

View File

@@ -7,6 +7,7 @@ import argparse
from tabulate import tabulate from tabulate import tabulate
from port import Portainer from port import Portainer
import logging import logging
VERSION = "0.0.1" VERSION = "0.0.1"
defaults = { defaults = {
@@ -15,42 +16,67 @@ defaults = {
"deploy_mode": "git", "deploy_mode": "git",
"autostart": True, "autostart": True,
"stack_mode": "swarm", "stack_mode": "swarm",
"site": "portainer" "site": "portainer",
} }
parser = argparse.ArgumentParser(description="Portainer helper - use env vars or pass credentials.") parser = argparse.ArgumentParser(
parser.add_argument("--base", description="Portainer helper - use env vars or pass credentials."
"-b", )
default=os.getenv("PORTAINER_URL", "https://portainer.example.com"), parser.add_argument(
help="Base URL for Portainer (ENV: PORTAINER_URL)" "--base",
) "-b",
default=os.getenv("PORTAINER_URL", "https://portainer.example.com"),
help="Base URL for Portainer (ENV: PORTAINER_URL)",
)
parser.add_argument("--site", "-t", type=str, default=None, help="Site") parser.add_argument("--site", "-t", type=str, default=None, help="Site")
parser.add_argument("--endpoint-id", "-e", type=str, default=None, help="Endpoint ID to limit stack operations") parser.add_argument(
parser.add_argument("--refresh-environment", "-R", action="store_true", help="List endpoints") "--endpoint-id",
parser.add_argument("--list-endpoints", "-E", action="store_true", help="List endpoints") "-e",
type=str,
default=None,
help="Endpoint ID to limit stack operations",
)
parser.add_argument(
"--refresh-environment", "-R", action="store_true", help="List endpoints"
)
parser.add_argument(
"--list-endpoints", "-E", action="store_true", help="List endpoints"
)
parser.add_argument("--list-stacks", "-l", action="store_true", help="List stacks") parser.add_argument("--list-stacks", "-l", action="store_true", help="List stacks")
parser.add_argument("--print-all-data", "-A", action="store_true", help="List stacks") parser.add_argument("--print-all-data", "-A", action="store_true", help="List stacks")
parser.add_argument("--list-containers", "-c", action="store_true", help="List containers") parser.add_argument(
"--list-containers", "-c", action="store_true", help="List containers"
)
parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacks") parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacks")
parser.add_argument("--stop-containers", "-O", action="store_true", help="Stop containers") parser.add_argument(
parser.add_argument("--start-containers", "-X", action="store_true", help="Start containers") "--stop-containers", "-O", action="store_true", help="Stop containers"
)
parser.add_argument(
"--start-containers", "-X", action="store_true", help="Start containers"
)
parser.add_argument("--update-status", "-S", action="store_true", help="Update status") parser.add_argument("--update-status", "-S", action="store_true", help="Update status")
parser.add_argument("--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id") parser.add_argument(
"--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id"
)
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform") parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
parser.add_argument("--autostart", "-Z", action="store_true", help="Auto-start created stacks") parser.add_argument(
parser.add_argument("--start-stack", "-x", action='store_true') "--autostart", "-Z", action="store_true", help="Auto-start created stacks"
parser.add_argument("--stop-stack", "-o", action='store_true') )
parser.add_argument("--secrets", "-q", action='store_true') parser.add_argument("--start-stack", "-x", action="store_true")
parser.add_argument("--debug", "-D", action='store_true') parser.add_argument("--stop-stack", "-o", action="store_true")
parser.add_argument("--create-stack", "-n", action='store_true') parser.add_argument("--secrets", "-q", action="store_true")
parser.add_argument("--create-stack_new2", "-N", action='store_true') parser.add_argument("--debug", "-D", action="store_true")
parser.add_argument("--gpu", "-g", action='store_true') parser.add_argument("--create-stack", "-n", action="store_true")
parser.add_argument("--create-stacks", "-C", action='store_true') parser.add_argument("--create-stack_new2", "-N", action="store_true")
parser.add_argument("--refresh-status", "-r", action='store_true') parser.add_argument("--gpu", "-g", action="store_true")
parser.add_argument("--create-stacks", "-C", action="store_true")
parser.add_argument("--refresh-status", "-r", action="store_true")
parser.add_argument("--stack", "-s", type=str, help="Stack ID for operations") parser.add_argument("--stack", "-s", type=str, help="Stack ID for operations")
parser.add_argument("--token-only", action="store_true", help="Print auth token and exit") parser.add_argument(
"--token-only", action="store_true", help="Print auth token and exit"
)
parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds") parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds")
parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode") parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode") parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode")
@@ -64,32 +90,32 @@ if _LOG_LEVEL == "DEBUG":
logging.basicConfig( logging.basicConfig(
filename=LOG_FILE, filename=LOG_FILE,
level=logging.DEBUG, level=logging.DEBUG,
format='%(asctime)s : %(levelname)s : %(message)s', format="%(asctime)s : %(levelname)s : %(message)s",
datefmt='%m/%d/%Y %I:%M:%S %p' datefmt="%m/%d/%Y %I:%M:%S %p",
) )
logging.debug('using debug logging') logging.debug("using debug logging")
elif _LOG_LEVEL == "ERROR": elif _LOG_LEVEL == "ERROR":
logging.basicConfig( logging.basicConfig(
filename=LOG_FILE, filename=LOG_FILE,
level=logging.ERROR, level=logging.ERROR,
format='%(asctime)s : %(levelname)s : %(message)s', format="%(asctime)s : %(levelname)s : %(message)s",
datefmt='%m/%d/%Y %I:%M:%S %p' datefmt="%m/%d/%Y %I:%M:%S %p",
) )
logging.info('using error logging') logging.info("using error logging")
elif _LOG_LEVEL == "SCAN": elif _LOG_LEVEL == "SCAN":
logging.basicConfig( logging.basicConfig(
filename=LOG_FILE, filename=LOG_FILE,
level=logging.DEBUG, level=logging.DEBUG,
format='%(asctime)s : %(levelname)s : %(message)s', format="%(asctime)s : %(levelname)s : %(message)s",
datefmt='%m/%d/%Y %I:%M:%S %p' datefmt="%m/%d/%Y %I:%M:%S %p",
) )
logging.info('using scan logging') logging.info("using scan logging")
else: else:
logging.basicConfig( logging.basicConfig(
filename=LOG_FILE, filename=LOG_FILE,
level=logging.INFO, level=logging.INFO,
format='%(asctime)s : %(levelname)s : %(message)s', format="%(asctime)s : %(levelname)s : %(message)s",
datefmt='%m/%d/%Y %I:%M:%S %p' datefmt="%m/%d/%Y %I:%M:%S %p",
) )
logging.info("script started") logging.info("script started")
@@ -125,10 +151,14 @@ def get_portainer_token(base_url, username=None, password=None, timeout=10):
username = username or os.getenv("PORTAINER_USER") username = username or os.getenv("PORTAINER_USER")
password = password or os.getenv("PORTAINER_PASS") password = password or os.getenv("PORTAINER_PASS")
if not username or not password: if not username or not password:
raise ValueError("Username and password must be provided (or set PORTAINER_USER / PORTAINER_PASS).") raise ValueError(
"Username and password must be provided (or set PORTAINER_USER / PORTAINER_PASS)."
)
url = f"{base_url.rstrip('/')}/api/auth" url = f"{base_url.rstrip('/')}/api/auth"
resp = requests.post(url, json={"Username": username, "Password": password}, timeout=timeout) resp = requests.post(
url, json={"Username": username, "Password": password}, timeout=timeout
)
resp.raise_for_status() resp.raise_for_status()
data = resp.json() data = resp.json()
token = data.get("jwt") or data.get("JWT") or data.get("token") token = data.get("jwt") or data.get("JWT") or data.get("token")
@@ -161,28 +191,30 @@ if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
# token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret") # token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret")
if args.action is None: if args.action is None:
actions = ["delete_stack", actions = [
"create_stack", "delete_stack",
"stop_stack", "create_stack",
"start_stack", "stop_stack",
"list_stacks", "start_stack",
"update_stack", "list_stacks",
"secrets", "update_stack",
"print_all_data", "secrets",
"list_endpoints", "print_all_data",
"list_containers", "list_endpoints",
"stop_containers", "list_containers",
"start_containers", "stop_containers",
"refresh_environment", "start_containers",
"refresh_status", "refresh_environment",
"update_status"] "refresh_status",
"update_status",
]
print("Possible actions: ") print("Possible actions: ")
i = 1 i = 1
for a in actions: for a in actions:
print(f" > {i}. {a}") print(f" > {i}. {a}")
i += 1 i += 1
ans = input("\nSelect action to perform: ") ans = input("\nSelect action to perform: ")
args.action = actions[int(ans)-1] args.action = actions[int(ans) - 1]
token = portainer_api_key token = portainer_api_key
# Example: list endpoints # Example: list endpoints
@@ -197,7 +229,7 @@ if __name__ == "__main__":
"influxdb2-admin-token": "l4c1j4yd33Du5lo", "influxdb2-admin-token": "l4c1j4yd33Du5lo",
"ha_influxdb2_admin_token": "l4c1j4yd33Du5lo", "ha_influxdb2_admin_token": "l4c1j4yd33Du5lo",
"wordpress_db_password": "wordpress", "wordpress_db_password": "wordpress",
"wordpress_root_db_password": "wordpress" "wordpress_root_db_password": "wordpress",
} }
for key, value in secrets.items(): for key, value in secrets.items():
res = por.create_secret(key, value, args.endpoint_id, args.timeout) res = por.create_secret(key, value, args.endpoint_id, args.timeout)
@@ -205,23 +237,40 @@ if __name__ == "__main__":
sys.exit() sys.exit()
if args.action == "delete_stack": if args.action == "delete_stack":
args = prompt_missing_args(args, defaults, [ args = prompt_missing_args(
("site", "Site"), args,
("endpoint_id", "Endpoint ID"), defaults,
("stack", "Stack name or ID") [
]) ("site", "Site"),
por.delete_stack(args.endpoint_id, args.stack,) ("endpoint_id", "Endpoint ID"),
("stack", "Stack name or ID"),
],
)
por.delete_stack(
args.endpoint_id,
args.stack,
)
sys.exit() sys.exit()
if args.action == "create_stack": if args.action == "create_stack":
args = prompt_missing_args(args, defaults, [ args = prompt_missing_args(
("site", "Site"), args,
("endpoint_id", "Endpoint ID"), defaults,
("stack", "Stack name or ID"), [
("stack_mode", "Stack mode (swarm or compose)"), ("site", "Site"),
("deploy_mode", "Deploy mode (git or upload)") ("endpoint_id", "Endpoint ID"),
]) ("stack", "Stack name or ID"),
por.create_stack(args.endpoint_id, args.stack, args.deploy_mode, args.autostart, args.stack_mode) ("stack_mode", "Stack mode (swarm or compose)"),
("deploy_mode", "Deploy mode (git or upload)"),
],
)
por.create_stack(
args.endpoint_id,
args.stack,
args.deploy_mode,
args.autostart,
args.stack_mode,
)
sys.exit() sys.exit()
if args.action == "stop_stack": if args.action == "stop_stack":