Files
portainer/port.py
2025-11-30 23:01:08 +01:00

697 lines
29 KiB
Python

import os
import requests
import json
import uuid
import argparse
import shutil
import time
import logging
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
from tabulate import tabulate
from git import Repo # pip install gitpython
class Portainer:
"""
Simple wrapper around the module-level Portainer helper functions.
Instantiate with base_url and optional token/timeout and call methods
to perform API operations.
"""
def __init__(self, base_url, token, timeout=10):
self.base_url = base_url.rstrip('/')
self.token = token
self.timeout = timeout
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.stack_name = None
self.stacks_all = {}
self.stack_id = None
self.stack_ids = []
self.endpoint_name = None
self.endpoint_id = None
#self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git"
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
self.repo_dir = "/tmp/docker-compose"
self.basic_stacks = ["pihole","nginx", "mosquitto", "webhub", "authentik","bitwarden","mailu3","home-assistant","homepage"]
self.nas_stacks = self.basic_stacks + ["gitlab", "bookstack","dockermon","gitea","grafana","immich","jupyter","kestra","mealie"]
self.m_server_stacks = self.basic_stacks + ['immich', 'zabbix-server', 'gitea', 'unifibrowser', 'mediacenter', 'watchtower', 'wazuh', 'octoprint', 'motioneye', 'kestra', 'bookstack', 'wud', 'uptime-kuma', 'registry', 'regsync', 'dockermon', 'grafana', 'nextcloud', 'semaphore', 'node-red', 'test', 'jupyter', 'paperless', 'mealie', 'n8n', 'ollama', 'rancher']
self.rpi5_stacks = self.basic_stacks + ["gitlab","bookstack","gitea"]
self.rack_stacks = self.basic_stacks + ["gitlab", "bookstack","dockermon","gitea","grafana","immich","jupyter","kestra","mealie"]
self.log_mode = False
self.hw_mode = False
self.all_data = {"containers":{},"stacks":{},"endpoints":{}}
self.get_endpoints()
self.get_stacks()
self.get_containers()
def is_number(self, s):
"""Check if the input string is a number."""
try:
float(s)
return True
except ValueError:
return False
def api_get(self, path, timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
def api_post(self, path, json="", timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
#print(url)
#print(json)
resp = requests.post(url, headers=headers, json=json, timeout=timeout)
return resp.text
def api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
#input("API POST2 called. Press Enter to continue.")
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
data = {
"EndpointId": endpoint_id,
"Name": name,
"Env": json.dumps(envs)
}
#print(data)
resp = requests.post(url, headers=headers, files=file, data=data, timeout=timeout)
resp.raise_for_status()
return resp.json()
def api_post_no_body(self, path, timeout=120):
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
#print(url)
headers = {"X-API-Key": f"{self.token}"}
resp = requests.post(url, headers=headers, timeout=timeout)
return resp.text
def api_delete(self, path, timeout=120):
"""Example authenticated DELETE request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.delete(url, headers=headers, timeout=timeout)
#print(resp)
resp.raise_for_status()
#print(resp.status_code)
return resp.status_code
def refresh(self):
self.get_endpoints()
self.get_stacks(self)
self.get_containers(self)
def get_stacks(self, endpoint_id="all", timeout=10):
if endpoint_id != "all":
endpoint_id = self.get_endpoint_id(endpoint_id)
path = "/stacks"
stcks = []
stacks = self.api_get(path, timeout=timeout)
self.stacks_all = {}
fail_endponts = [20,39,41]
#print(json.dumps(stacks,indent=2))
webhooks = {}
for s in stacks:
#print(type(s["AutoUpdate"]) )
#input(s)
if s['EndpointId'] in fail_endponts:
continue
if not s['EndpointId'] in webhooks:
try:
webhooks[s['EndpointId']] = {"webhook":{}}
webhooks[self.endpoints["by_id"][s['EndpointId']]] = {"webhook":{}}
except:
pass
if not s['EndpointId'] in self.stacks_all:
self.stacks_all[s['EndpointId']] = {"by_id":{},"by_name":{}}
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]] = {"by_id":{},"by_name":{}}
self.stacks_all[s['EndpointId']]["by_id"][s['Id']] = s['Name']
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_id"][s['Id']] = s['Name']
self.stacks_all[s['EndpointId']]["by_name"][s['Name']] = s['Id']
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_name"][s['Name']] = s['Id']
#print(s)
if "AutoUpdate" in s and s["AutoUpdate"] is not None:
if type(s["AutoUpdate"]) == dict and "Webhook" in s["AutoUpdate"]:
#print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
#print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
webhooks[s['EndpointId']][s['Name']] = s['AutoUpdate']['Webhook']
webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['AutoUpdate']['Webhook']
elif s["AutoUpdate"]["Webhook"] != "":
webhooks[s['EndpointId']][s['Name']] = s['Webhook']
webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['Webhook']
#print(self.stacks_all)
if s['EndpointId'] == endpoint_id or endpoint_id == "all":
stcks.append(s)
#print(stcks)
if stcks is None:
return []
self.stacks = stacks
self.all_data["stacks"] = self.stacks_all
self.all_data["webhooks"] = webhooks
#input(json.dumps(self.stacks_all,indent=2))
return stcks
def get_stack_id(self,endpoint,stack):
pass
def update_status(self,endpoint,stack):
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
#input(path)
stats = self.api_get(path)
print(stats)
def get_endpoint_id(self,endpoint):
if self.is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return endpoint
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return self.endpoints["by_name"][endpoint]
def get_endpoint_name(self,endpoint):
if self.is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return self.endpoints["by_id"][endpoint]
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return endpoint
def get_containers(self, endpoint="all", stack="all", timeout=30):
#print(json.dumps(self.all_data,indent=2))
#print(endpoint)
#print(stack)
cont = []
data = {}
if endpoint == "all":
for s in self.all_data["endpoints"]["by_id"]:
#print(s)
if stack == "all":
if not s in self.all_data["stacks"]:
continue
if self.all_data["endpoints_status"][s] != 1:
#print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
for e in self.all_data["stacks"][s]["by_name"]:
path = f"/endpoints/{s}/docker/containers/json?all=1&filters={{\"label\": [\"com.docker.compose.project={e}\"]}}"
logging.info(f"request : {path}")
try:
containers = self.api_get(path)
except:
print(f"failed to get containers from {path}")
continue
contr = []
try:
for c in containers:
cont.append(c["Names"][0].replace("/",""))
contr.append(c["Names"][0].replace("/",""))
if self.all_data["endpoints"]["by_id"][s] in data:
data[self.all_data["endpoints"]["by_id"][s]][e] = contr
else:
data[self.all_data["endpoints"]["by_id"][s]] = {e:contr}
except:
print("lalalal")
#print(data)
self.all_data["containers"] = data
else:
self.get_containers()
for i in self.all_data["containers"][endpoint][stack]:
cont.append(i)
return cont
def stop_containers(self, endpoint, containers, timeout=130):
if self.all_data["endpoints_status"][endpoint] != 1:
print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline")
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Stopping {c}")
self.api_post_no_body(
f"/endpoints/{ep_id}/docker/containers/{c}/stop"
)
#print(f"✔")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
# for c in containers:
# print(f" > Stopping {c}")
# self.api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
# return 0
def start_containers(self, endpoint, containers, timeout=130):
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Starting {c}")
self.api_post_no_body(
f"/endpoints/{ep_id}/docker/containers/{c}/start"
)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
def update_stack(self, endpoint, stack, autostart, timeout=130):
stcs = []
if stack == "all":
for s in self.all_data["webhooks"][endpoint]:
stcs.append([s,self.all_data["webhooks"][endpoint][s]])
else:
stcs.append([stack, self.all_data["webhooks"][endpoint][stack]])
#input(stcs)
def update(c):
print(f" > Updating {c[0]} on {endpoint}")
ans = self.api_post_no_body(
f"/stacks/webhooks/{c[1]}"
)
def stop():
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack== "all":
cont+=self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint,cont)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(update, stcs)
if not autostart:
time.sleep(120)
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack== "all":
cont+=self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint,cont)
def get_endpoints(self, timeout=10):
endpoints = self.api_get("/endpoints")
eps = {"by_id":{}, "by_name":{}}
eps_stats = {}
for ep in endpoints:
eps['by_id'][ep['Id']] = ep['Name']
eps['by_name'][ep['Name']] = ep['Id']
eps_stats[ep['Id']] = ep['Status']
eps_stats[ep['Name']] = ep['Status']
self.endpoints = eps
self.all_data["endpoints"] = eps
self.all_data["endpoints_status"] = eps_stats
#input(eps_stats)
#input(eps)
return eps
def get_endpoint(self, endpoint_id=None, timeout=30):
self.get_endpoints()
#print(self.endpoints)
if self.is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
return self.endpoint_id
def get_swarm_id(self, endpoint):
ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info"
stats = self.api_get(path)
return stats['Swarm']['Cluster']['ID']
def get_stack(self, stack=None, endpoint_id=None, timeout=None):
self.get_stacks(endpoint_id)
if not self.is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
self.stack_id = []
if stack == "all":
for s in self.stacks:
#print(s)
if (endpoint_id == s.get("EndpointId")):
self.stack_ids.append(s.get("Id"))
return self.stack_ids
else:
for s in self.stacks:
#print(s)
if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")):
self.stack_id = s.get("Id")
self.stack_name = s.get("Name")
self.stack_ids.append(s.get("Id"))
return s
raise ValueError(f"Stack not found: {stack}")
def create_stack(self, endpoint, stack=None, mode="git", autostart=False, swarm=False, timeout=None):
if swarm:
swarm_id = self.get_swarm_id(endpoint)
p = "swarm"
env_path = f"{self.repo_dir}/__swarm/{stack}/.env"
else:
p = "standalone"
env_path = f"{self.repo_dir}/{stack}/.env"
#input(swarm_id)
self.endpoint_id = self.get_endpoint_id(endpoint)
if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir)
print(f"Folder '{self.repo_dir}' has been removed.")
else:
print(f"Folder '{self.repo_dir}' does not exist.")
Repo.clone_from(self.git_url, self.repo_dir)
if mode == "git":
print("Creating new stack from git repo...")
path = f"/stacks/create/{p}/repository"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
print(path)
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
elif self.endpoint_name == "nas":
stacks = self.nas_stacks
else:
stacks = [stack]
#print(json.dumps(self.stacks_all, indent=2))
#input(json.dumps(self.stacks_all,indent=2))
for stack in stacks:
if self.endpoint_id in self.stacks_all:
if stack in self.stacks_all[self.endpoint_id]['by_id'] or stack in self.stacks_all[self.endpoint_id]['by_name']:
print(f"Stack {stack} already exist")
continue
print(f"Working on {stack}")
envs = []
if os.path.exists(f"{env_path}"):
f = open(f"{env_path}","r")
env_vars = f.read().splitlines()
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=",1)
envs.append({"name": name, "value": value})
f.close()
#wl(envs)
for e in envs:
#print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE","HW_MODE1","HW_MODE2"]
if e['name'] == "RESTART" and self.endpoint_name == "m-server":
e['value'] = "always"
if e['name'] in HWS:
#print("Found HW_MODE env var.")
if self.hw_mode:
e['value'] = "hw"
else:
e['value'] = "cpu"
if e['name'] == "LOGGING":
#print("Found LOGGING env var.")
if self.log_mode:
e['value'] = "journald"
else:
e['value'] = "syslog"
uid = uuid.uuid4()
#print(uid)
req = {
"Name": stack,
"Env": envs,
"AdditionalFiles": [],
"AutoUpdate": {
"forcePullImage": True,
"forceUpdate": False,
"webhook": f"{uid}"
},
"repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git",
"ReferenceName": "refs/heads/main",
"composeFile": f"{stack}/docker-compose.yml",
"ConfigFilePath": f"{stack}/docker-compose.yml",
"repositoryAuthentication": True,
"repositoryUsername": "jaydee",
"repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS",
"AuthorizationType": 0,
"TLSSkipVerify": False,
"supportRelativePath": True,
"repositoryAuthentication": True,
"fromAppTemplate": False,
"registries": [6,3],
"FromAppTemplate": False,
"Namespace": "",
"CreatedByUserId": "",
"Webhook": "",
"filesystemPath": "/share/docker_data/portainer/portainer-data/",
"RegistryID": 4,
"isDetachedFromGit": True,
"method":"repository",
"swarmID": None
}
if swarm:
req["type"] = "swarm"
req["swarmID"] = swarm_id
req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml"
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
print(json.dumps(req))
res = self.api_post(path,req)
if "Id" in res:
#print("Deploy request OK")
pass
else:
print(res)
tries = 0
created = False
while True:
try:
#print(self.endpoint_id)
#print(stack)
stck2 = self.get_stack(stack, self.endpoint_id)
created = True
break
except Exception as e:
print(f"Waiting for stack {stack} to be created...{tries}/50", end="\r")
time.sleep(10)
tries += 1
if tries > 50:
print(f"Error retrieving stack {stack} after creation: {self.endpoint_name}")
break
if created:
if stack != "pihole":
#print(autostart)
if not autostart:
#self.get_stacks()
#self.stop_stack(stack,self.endpoint_id)
conts = self.get_containers(self.endpoint_name,stack)
#print(conts)
self.stop_containers(self.endpoint_name,conts)
if mode == "file":
print("Creating new stack from file...")
path = "/stacks/create/standalone/file"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
elif self.endpoint_name == "m-server":
stacks = self.m_server_stacks
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
else:
stacks = [stack]
for stack in stacks:
print(f"Working on {stack}")
if os.path.exists(f"{self.repo_dir}/{stack}/.env"):
f = open(f"{self.repo_dir}/{stack}/.env","r")
env_vars = f.read().splitlines()
envs = []
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=",1)
envs.append({"name": name, "value": value})
f.close()
#wl(envs)
for e in envs:
#print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE","HW_MODE1","HW_MODE2"]
if e['name'] == "RESTART" and self.endpoint_name == "m-server":
e['value'] = "always"
if e['name'] in HWS:
print("Found HW_MODE env var.")
if self.hw_mode:
e['value'] = "hw"
else:
e['value'] = "cpu"
if e['name'] == "LOGGING":
print("Found LOGGING env var.")
if self.log_mode:
e['value'] = "journald"
else:
e['value'] = "syslog"
file = {
# ("filename", file_object)
"file": ("docker-compose.yml", open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb")),
}
self.api_post_file(path,self.endpoint_id,stack,envs,file)
def print_stacks(self,endpoint="all"):
stacks = self.get_stacks()
count = 0
lst = []
data = []
for stack in stacks:
if endpoint != None:
if not stack['EndpointId'] in self.endpoints['by_id']:
continue
if endpoint != "all":
if self.endpoints['by_name'][endpoint] != stack['EndpointId']:
continue
try:
data.append([stack['Id'], stack['Name'], self.endpoints['by_id'][stack['EndpointId']]])
except KeyError as e:
data.append([stack['Id'], stack['Name'], "?"])
count += 1
headers = ["StackID", "Name", "Endpoint"]
print(tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}")
def start_stack(self,stack=None,endpoint_id=None):
if endpoint_id != None:
print("Getting endpoint")
self.get_endpoint(endpoint_id)
if stack != None:
self.get_stack(stack,endpoint_id)
for stack in self.stack_ids:
path = f"/stacks/{stack}/start"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self.api_post_no_body(path, timeout=20)
except Exception as e:
print(f"Error stoping stack: {e}")
return []
if "Id" in json.loads(resp):
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started")
else:
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}")
return True
def stop_stack(self,stack,endpoint_id):
print(f"Stopping stack {stack}")
if endpoint_id != None:
self.get_endpoint(endpoint_id)
if stack == "all":
self.get_stack(stack,endpoint_id)
else:
if stack != None:
self.stack_ids = [self.get_stack(stack,endpoint_id)["Id"]]
for stack in self.stack_ids:
path = f"/stacks/{stack}/stop"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self.api_post_no_body(path, timeout=120)
except NameError as e:
print(f"Error stoping stack: {e}")
return []
if "Id" in json.loads(resp):
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped")
else:
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}")
return True
def delete_stack(self, endpoint_id=None, stack=None, timeout=None):
"""
Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
"""
self.get_endpoints()
if self.is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
if not self.is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
if not self.is_number(stack) and stack != "all":
#print(stack)
#print(self.endpoint_id)
stack = self.get_stack(stack,self.endpoint_id)["Id"]
if stack == "all":
stacks = self.get_stacks(self.endpoint_id)
paths = []
for s in stacks:
#print(f"Delete stack {s['Name']}")
#print(s['EndpointId'], endpoint_id)
if int(s['EndpointId']) != int(endpoint_id):
continue
#print("Deleting stack:", s['Name'])
path = f"/stacks/{s['Id']}"
if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true"
paths.append([self.get_endpoint_name(endpoint_id),s['Name'],path])
#input(paths)
def delete(c):
print(f"Delete stack {c[1]} from {c[0]} ")
out = self.api_delete(c[2])
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(delete, paths)
return "Done"
else:
path = f"/stacks/{stack}"
if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true"
# print(path)
try:
# print(path)
# print(base_url)
# print(token)
stacks = self.api_delete(path)
except Exception as e:
#print(f"Error creating stack: {e}")
if "Conflict for url" in str(e):
print("Stack with this name may already exist.")
else:
print(f"Error deleting stack: {e}")
#print(stacks)
return []
if stacks is None:
return []
return stacks
def refresh_status(self, stack, timeout=None):
pass
def __repr__(self):
pass