import os import requests import json import uuid import argparse import shutil import time import logging from concurrent.futures import ThreadPoolExecutor logger = logging.getLogger(__name__) from tabulate import tabulate from git import Repo # pip install gitpython class Portainer: """ Simple wrapper around the module-level Portainer helper functions. Instantiate with base_url and optional token/timeout and call methods to perform API operations. """ def __init__(self, base_url, token, timeout=10): self.base_url = base_url.rstrip('/') self.token = token self.timeout = timeout self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.stack_name = None self.stacks_all = {} self.stack_id = None self.stack_ids = [] self.endpoint_name = None self.endpoint_id = None #self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git" self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.repo_dir = "/tmp/docker-compose" self.basic_stacks = ["pihole","nginx", "mosquitto", "webhub", "authentik","bitwarden","mailu3","home-assistant","homepage"] self.nas_stacks = self.basic_stacks + ["gitlab", "bookstack","dockermon","gitea","grafana","immich","jupyter","kestra","mealie"] self.m_server_stacks = self.basic_stacks + ['immich', 'zabbix-server', 'gitea', 'unifibrowser', 'mediacenter', 'watchtower', 'wazuh', 'octoprint', 'motioneye', 'kestra', 'bookstack', 'wud', 'uptime-kuma', 'registry', 'regsync', 'dockermon', 'grafana', 'nextcloud', 'semaphore', 'node-red', 'test', 'jupyter', 'paperless', 'mealie', 'n8n', 'ollama', 'rancher'] self.rpi5_stacks = self.basic_stacks + ["gitlab","bookstack","gitea"] self.rack_stacks = self.basic_stacks + ["gitlab", "bookstack","dockermon","gitea","grafana","immich","jupyter","kestra","mealie"] self.log_mode = False self.hw_mode = False self.all_data = {"containers":{},"stacks":{},"endpoints":{}} self.get_endpoints() self.get_stacks() self.get_containers() def is_number(self, s): """Check if the input string is a number.""" try: float(s) return True except ValueError: return False def api_get(self, path, timeout=120): url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} resp = requests.get(url, headers=headers, timeout=timeout) resp.raise_for_status() return resp.json() def api_post(self, path, json="", timeout=120): url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} #print(url) #print(json) resp = requests.post(url, headers=headers, json=json, timeout=timeout) return resp.text def api_post_file(self, path, endpoint_id, name, envs, file, timeout=120): #input("API POST2 called. Press Enter to continue.") """Example authenticated GET request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} data = { "EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs) } #print(data) resp = requests.post(url, headers=headers, files=file, data=data, timeout=timeout) resp.raise_for_status() return resp.json() def api_post_no_body(self, path, timeout=120): """Example authenticated GET request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" #print(url) headers = {"X-API-Key": f"{self.token}"} resp = requests.post(url, headers=headers, timeout=timeout) return resp.text def api_delete(self, path, timeout=120): """Example authenticated DELETE request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} resp = requests.delete(url, headers=headers, timeout=timeout) #print(resp) resp.raise_for_status() #print(resp.status_code) return resp.status_code def refresh(self): self.get_endpoints() self.get_stacks(self) self.get_containers(self) def get_stacks(self, endpoint_id="all", timeout=10): if endpoint_id != "all": endpoint_id = self.get_endpoint_id(endpoint_id) path = "/stacks" stcks = [] stacks = self.api_get(path, timeout=timeout) self.stacks_all = {} fail_endponts = [20,39,41] #print(json.dumps(stacks,indent=2)) webhooks = {} for s in stacks: #print(type(s["AutoUpdate"]) ) #input(s) if s['EndpointId'] in fail_endponts: continue if not s['EndpointId'] in webhooks: try: webhooks[s['EndpointId']] = {"webhook":{}} webhooks[self.endpoints["by_id"][s['EndpointId']]] = {"webhook":{}} except: pass if not s['EndpointId'] in self.stacks_all: self.stacks_all[s['EndpointId']] = {"by_id":{},"by_name":{}} self.stacks_all[self.endpoints["by_id"][s['EndpointId']]] = {"by_id":{},"by_name":{}} self.stacks_all[s['EndpointId']]["by_id"][s['Id']] = s['Name'] self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_id"][s['Id']] = s['Name'] self.stacks_all[s['EndpointId']]["by_name"][s['Name']] = s['Id'] self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_name"][s['Name']] = s['Id'] #print(s) if "AutoUpdate" in s and s["AutoUpdate"] is not None: if type(s["AutoUpdate"]) == dict and "Webhook" in s["AutoUpdate"]: #print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook']) #print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW") webhooks[s['EndpointId']][s['Name']] = s['AutoUpdate']['Webhook'] webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['AutoUpdate']['Webhook'] elif s["AutoUpdate"]["Webhook"] != "": webhooks[s['EndpointId']][s['Name']] = s['Webhook'] webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['Webhook'] #print(self.stacks_all) if s['EndpointId'] == endpoint_id or endpoint_id == "all": stcks.append(s) #print(stcks) if stcks is None: return [] self.stacks = stacks self.all_data["stacks"] = self.stacks_all self.all_data["webhooks"] = webhooks #input(json.dumps(self.stacks_all,indent=2)) return stcks def get_stack_id(self,endpoint,stack): pass def update_status(self,endpoint,stack): path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true" #input(path) stats = self.api_get(path) print(stats) def get_endpoint_id(self,endpoint): if self.is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] return endpoint else: self.endpoint_name = endpoint self.endpoint_id = self.endpoints["by_name"][endpoint] return self.endpoints["by_name"][endpoint] def get_endpoint_name(self,endpoint): if self.is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] return self.endpoints["by_id"][endpoint] else: self.endpoint_name = endpoint self.endpoint_id = self.endpoints["by_name"][endpoint] return endpoint def get_containers(self, endpoint="all", stack="all", timeout=30): #print(json.dumps(self.all_data,indent=2)) #print(endpoint) #print(stack) cont = [] data = {} if endpoint == "all": for s in self.all_data["endpoints"]["by_id"]: #print(s) if stack == "all": if not s in self.all_data["stacks"]: continue if self.all_data["endpoints_status"][s] != 1: #print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") continue for e in self.all_data["stacks"][s]["by_name"]: path = f"/endpoints/{s}/docker/containers/json?all=1&filters={{\"label\": [\"com.docker.compose.project={e}\"]}}" logging.info(f"request : {path}") try: containers = self.api_get(path) except: print(f"failed to get containers from {path}") continue contr = [] try: for c in containers: cont.append(c["Names"][0].replace("/","")) contr.append(c["Names"][0].replace("/","")) if self.all_data["endpoints"]["by_id"][s] in data: data[self.all_data["endpoints"]["by_id"][s]][e] = contr else: data[self.all_data["endpoints"]["by_id"][s]] = {e:contr} except: print("lalalal") #print(data) self.all_data["containers"] = data else: self.get_containers() for i in self.all_data["containers"][endpoint][stack]: cont.append(i) return cont def stop_containers(self, endpoint, containers, timeout=130): if self.all_data["endpoints_status"][endpoint] != 1: print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline") ep_id = self.endpoints["by_name"][endpoint] def stop(c): print(f" > Stopping {c}") self.api_post_no_body( f"/endpoints/{ep_id}/docker/containers/{c}/stop" ) #print(f"✔") with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) # for c in containers: # print(f" > Stopping {c}") # self.api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop") # return 0 def start_containers(self, endpoint, containers, timeout=130): ep_id = self.endpoints["by_name"][endpoint] def stop(c): print(f" > Starting {c}") self.api_post_no_body( f"/endpoints/{ep_id}/docker/containers/{c}/start" ) with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) def update_stack(self, endpoint, stack, autostart, timeout=130): stcs = [] if stack == "all": for s in self.all_data["webhooks"][endpoint]: stcs.append([s,self.all_data["webhooks"][endpoint][s]]) else: stcs.append([stack, self.all_data["webhooks"][endpoint][stack]]) #input(stcs) def update(c): print(f" > Updating {c[0]} on {endpoint}") ans = self.api_post_no_body( f"/stacks/webhooks/{c[1]}" ) def stop(): cont = [] for c in self.all_data["containers"][endpoint]: if stack == c or stack== "all": cont+=self.all_data["containers"][endpoint][c] self.stop_containers(endpoint,cont) with ThreadPoolExecutor(max_workers=10) as exe: exe.map(update, stcs) if not autostart: time.sleep(120) cont = [] for c in self.all_data["containers"][endpoint]: if stack == c or stack== "all": cont+=self.all_data["containers"][endpoint][c] self.stop_containers(endpoint,cont) def get_endpoints(self, timeout=10): endpoints = self.api_get("/endpoints") eps = {"by_id":{}, "by_name":{}} eps_stats = {} for ep in endpoints: eps['by_id'][ep['Id']] = ep['Name'] eps['by_name'][ep['Name']] = ep['Id'] eps_stats[ep['Id']] = ep['Status'] eps_stats[ep['Name']] = ep['Status'] self.endpoints = eps self.all_data["endpoints"] = eps self.all_data["endpoints_status"] = eps_stats #input(eps_stats) #input(eps) return eps def get_endpoint(self, endpoint_id=None, timeout=30): self.get_endpoints() #print(self.endpoints) if self.is_number(endpoint_id): self.endpoint_name = self.endpoints["by_id"][endpoint_id] self.endpoint_id = endpoint_id else: self.endpoint_name = endpoint_id self.endpoint_id = self.endpoints["by_name"][endpoint_id] return self.endpoint_id def get_swarm_id(self, endpoint): ep_id = self.endpoints["by_name"][endpoint] path = f"/endpoints/{ep_id}/docker/info" stats = self.api_get(path) return stats['Swarm']['Cluster']['ID'] def get_stack(self, stack=None, endpoint_id=None, timeout=None): self.get_stacks(endpoint_id) if not self.is_number(endpoint_id): endpoint_id = int(self.endpoints["by_name"][endpoint_id]) self.stack_id = [] if stack == "all": for s in self.stacks: #print(s) if (endpoint_id == s.get("EndpointId")): self.stack_ids.append(s.get("Id")) return self.stack_ids else: for s in self.stacks: #print(s) if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")): self.stack_id = s.get("Id") self.stack_name = s.get("Name") self.stack_ids.append(s.get("Id")) return s raise ValueError(f"Stack not found: {stack}") def create_stack(self, endpoint, stack=None, mode="git", autostart=False, ep_mode='swarm', timeout=None): swarm_id = self.get_swarm_id(endpoint) #input(swarm_id) self.endpoint_id = self.get_endpoint_id(endpoint) if os.path.exists(self.repo_dir): shutil.rmtree(self.repo_dir) print(f"Folder '{self.repo_dir}' has been removed.") else: print(f"Folder '{self.repo_dir}' does not exist.") Repo.clone_from(self.git_url, self.repo_dir) if mode == "git": print("Creating new stack from git repo...") enviro="swarm" path = f"/stacks/create/{enviro}/repository" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" print(path) if stack == "all": if self.endpoint_name == "rack": stacks = self.rack_stacks elif self.endpoint_name == "m-server": stacks = self.m_server_stacks elif self.endpoint_name == "rpi5": stacks = self.rpi5_stacks elif self.endpoint_name == "nas": stacks = self.nas_stacks else: stacks = [stack] #print(json.dumps(self.stacks_all, indent=2)) #input(json.dumps(self.stacks_all,indent=2)) for stack in stacks: if self.endpoint_id in self.stacks_all: if stack in self.stacks_all[self.endpoint_id]['by_id'] or stack in self.stacks_all[self.endpoint_id]['by_name']: print(f"Stack {stack} already exist") continue print(f"Working on {stack}") if os.path.exists(f"{self.repo_dir}/__swarm/{stack}/.env"): f = open(f"{self.repo_dir}/__swarm/{stack}/.env","r") env_vars = f.read().splitlines() envs = [] for ev in env_vars: if ev.startswith("#") or ev.strip() == "": continue if "=" in ev: name, value = ev.split("=",1) envs.append({"name": name, "value": value}) f.close() #wl(envs) for e in envs: #print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE","HW_MODE1","HW_MODE2"] if e['name'] == "RESTART" and self.endpoint_name == "m-server": e['value'] = "always" if e['name'] in HWS: #print("Found HW_MODE env var.") if self.hw_mode: e['value'] = "hw" else: e['value'] = "cpu" if e['name'] == "LOGGING": #print("Found LOGGING env var.") if self.log_mode: e['value'] = "journald" else: e['value'] = "syslog" uid = uuid.uuid4() #print(uid) req = { "Name": stack, "Env": envs, "AdditionalFiles": [], "AutoUpdate": { "forcePullImage": True, "forceUpdate": False, "webhook": f"{uid}" }, "repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git", "ReferenceName": "refs/heads/main", "composeFile": f"{stack}/docker-compose.yml", "ConfigFilePath": f"{stack}/docker-compose.yml", "repositoryAuthentication": True, "repositoryUsername": "jaydee", "repositoryPassword": "glpat-uj-n-eEfTY398PE4vKSS", "AuthorizationType": 0, "TLSSkipVerify": False, "supportRelativePath": True, "repositoryAuthentication": True, "fromAppTemplate": False, "registries": [6,3], "FromAppTemplate": False, "Namespace": "", "CreatedByUserId": "", "Webhook": "", "filesystemPath": "/share/docker_data/portainer/portainer-data/", "RegistryID": 4, "isDetachedFromGit": True, "method":"repository" } if ep_mode == "swarm": req["type"] = "swarm" req["swarmID"] = swarm_id req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml" req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml" print(json.dumps(req)) res = self.api_post(path,req) if "Id" in res: #print("Deploy request OK") pass else: print(res) tries = 0 created = False while True: try: #print(self.endpoint_id) #print(stack) stck2 = self.get_stack(stack, self.endpoint_id) created = True break except Exception as e: print(f"Waiting for stack {stack} to be created...{tries}/50", end="\r") time.sleep(10) tries += 1 if tries > 50: print(f"Error retrieving stack {stack} after creation: {self.endpoint_name}") break if created: if stack != "pihole": #print(autostart) if not autostart: #self.get_stacks() #self.stop_stack(stack,self.endpoint_id) conts = self.get_containers(self.endpoint_name,stack) #print(conts) self.stop_containers(self.endpoint_name,conts) if mode == "file": print("Creating new stack from file...") path = "/stacks/create/standalone/file" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" if stack == "all": if self.endpoint_name == "rack": stacks = self.rack_stacks elif self.endpoint_name == "m-server": stacks = self.m_server_stacks elif self.endpoint_name == "rpi5": stacks = self.rpi5_stacks else: stacks = [stack] for stack in stacks: print(f"Working on {stack}") if os.path.exists(f"{self.repo_dir}/{stack}/.env"): f = open(f"{self.repo_dir}/{stack}/.env","r") env_vars = f.read().splitlines() envs = [] for ev in env_vars: if ev.startswith("#") or ev.strip() == "": continue if "=" in ev: name, value = ev.split("=",1) envs.append({"name": name, "value": value}) f.close() #wl(envs) for e in envs: #print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE","HW_MODE1","HW_MODE2"] if e['name'] == "RESTART" and self.endpoint_name == "m-server": e['value'] = "always" if e['name'] in HWS: print("Found HW_MODE env var.") if self.hw_mode: e['value'] = "hw" else: e['value'] = "cpu" if e['name'] == "LOGGING": print("Found LOGGING env var.") if self.log_mode: e['value'] = "journald" else: e['value'] = "syslog" file = { # ("filename", file_object) "file": ("docker-compose.yml", open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb")), } self.api_post_file(path,self.endpoint_id,stack,envs,file) def print_stacks(self,endpoint="all"): stacks = self.get_stacks() count = 0 lst = [] data = [] for stack in stacks: if endpoint != None: if not stack['EndpointId'] in self.endpoints['by_id']: continue if endpoint != "all": if self.endpoints['by_name'][endpoint] != stack['EndpointId']: continue try: data.append([stack['Id'], stack['Name'], self.endpoints['by_id'][stack['EndpointId']]]) except KeyError as e: data.append([stack['Id'], stack['Name'], "?"]) count += 1 headers = ["StackID", "Name", "Endpoint"] print(tabulate(data, headers=headers, tablefmt="github")) print(f"Total stacks: {count}") def start_stack(self,stack=None,endpoint_id=None): if endpoint_id != None: print("Getting endpoint") self.get_endpoint(endpoint_id) if stack != None: self.get_stack(stack,endpoint_id) for stack in self.stack_ids: path = f"/stacks/{stack}/start" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" try: resp = self.api_post_no_body(path, timeout=20) except Exception as e: print(f"Error stoping stack: {e}") return [] if "Id" in json.loads(resp): print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started") else: print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}") return True def stop_stack(self,stack,endpoint_id): print(f"Stopping stack {stack}") if endpoint_id != None: self.get_endpoint(endpoint_id) if stack == "all": self.get_stack(stack,endpoint_id) else: if stack != None: self.stack_ids = [self.get_stack(stack,endpoint_id)["Id"]] for stack in self.stack_ids: path = f"/stacks/{stack}/stop" if self.endpoint_id is not None: path += f"?endpointId={self.endpoint_id}" try: resp = self.api_post_no_body(path, timeout=120) except NameError as e: print(f"Error stoping stack: {e}") return [] if "Id" in json.loads(resp): print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped") else: print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}") return True def delete_stack(self, endpoint_id=None, stack=None, timeout=None): """ Return a list of stacks. If endpoint_id is provided, it will be added as a query param. """ self.get_endpoints() if self.is_number(endpoint_id): self.endpoint_name = self.endpoints["by_id"][endpoint_id] self.endpoint_id = endpoint_id else: self.endpoint_name = endpoint_id self.endpoint_id = self.endpoints["by_name"][endpoint_id] if not self.is_number(endpoint_id): endpoint_id = int(self.endpoints["by_name"][endpoint_id]) if not self.is_number(stack) and stack != "all": #print(stack) #print(self.endpoint_id) stack = self.get_stack(stack,self.endpoint_id)["Id"] if stack == "all": stacks = self.get_stacks(self.endpoint_id) paths = [] for s in stacks: #print(f"Delete stack {s['Name']}") #print(s['EndpointId'], endpoint_id) if int(s['EndpointId']) != int(endpoint_id): continue #print("Deleting stack:", s['Name']) path = f"/stacks/{s['Id']}" if endpoint_id is not None: path += f"?endpointId={endpoint_id}&removeVolumes=true" paths.append([self.get_endpoint_name(endpoint_id),s['Name'],path]) #input(paths) def delete(c): print(f"Delete stack {c[1]} from {c[0]} ") out = self.api_delete(c[2]) with ThreadPoolExecutor(max_workers=10) as exe: exe.map(delete, paths) return "Done" else: path = f"/stacks/{stack}" if endpoint_id is not None: path += f"?endpointId={endpoint_id}&removeVolumes=true" # print(path) try: # print(path) # print(base_url) # print(token) stacks = self.api_delete(path) except Exception as e: #print(f"Error creating stack: {e}") if "Conflict for url" in str(e): print("Stack with this name may already exist.") else: print(f"Error deleting stack: {e}") #print(stacks) return [] if stacks is None: return [] return stacks def refresh_status(self, stack, timeout=None): pass def __repr__(self): pass