This commit is contained in:
2025-12-02 21:04:41 +01:00
parent c1fcb94962
commit c722b5f723
2 changed files with 296 additions and 237 deletions

386
port.py
View File

@@ -5,6 +5,9 @@ import uuid
import shutil
import time
import logging
import base64
import tabulate
from git import Repo
from concurrent.futures import ThreadPoolExecutor
@@ -129,7 +132,7 @@ class Portainer:
return resp.text
def api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
#input("API POST2 called. Press Enter to continue.")
# input("API POST2 called. Press Enter to continue.")
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
@@ -138,7 +141,7 @@ class Portainer:
"Name": name,
"Env": json.dumps(envs)
}
#print(data)
# print(data)
resp = requests.post(url, headers=headers, files=file, data=data, timeout=timeout)
resp.raise_for_status()
return resp.json()
@@ -146,7 +149,7 @@ class Portainer:
def api_post_no_body(self, path, timeout=120):
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
#print(url)
# print(url)
headers = {"X-API-Key": f"{self.token}"}
resp = requests.post(url, headers=headers, timeout=timeout)
return resp.text
@@ -156,11 +159,11 @@ class Portainer:
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.delete(url, headers=headers, timeout=timeout)
#print(resp)
# print(resp)
resp.raise_for_status()
#print(resp.status_code)
# print(resp.status_code)
return resp.status_code
def refresh(self):
self.get_endpoints()
self.get_stacks(self)
@@ -170,65 +173,65 @@ class Portainer:
if endpoint_id != "all":
endpoint_id = self.get_endpoint_id(endpoint_id)
path = "/stacks"
stcks = []
stcks = []
stacks = self.api_get(path, timeout=timeout)
self.stacks_all = {}
fail_endponts = [20,39,41]
#print(json.dumps(stacks,indent=2))
fail_endponts = [20, 39, 41]
# print(json.dumps(stacks,indent=2))
webhooks = {}
for s in stacks:
#print(type(s["AutoUpdate"]) )
#input(s)
# print(type(s["AutoUpdate"]) )
# input(s)
if s['EndpointId'] in fail_endponts:
continue
if not s['EndpointId'] in webhooks:
try:
webhooks[s['EndpointId']] = {"webhook":{}}
webhooks[self.endpoints["by_id"][s['EndpointId']]] = {"webhook":{}}
except:
pass
webhooks[s['EndpointId']] = {"webhook": {}}
webhooks[self.endpoints["by_id"][s['EndpointId']]] = {"webhook": {}}
except Exception as e:
logger.debug(f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}")
if not s['EndpointId'] in self.stacks_all:
self.stacks_all[s['EndpointId']] = {"by_id":{},"by_name":{}}
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]] = {"by_id":{},"by_name":{}}
self.stacks_all[s['EndpointId']] = {"by_id": {}, "by_name": {}}
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]] = {"by_id": {}, "by_name": {}}
self.stacks_all[s['EndpointId']]["by_id"][s['Id']] = s['Name']
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_id"][s['Id']] = s['Name']
self.stacks_all[s['EndpointId']]["by_name"][s['Name']] = s['Id']
self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_name"][s['Name']] = s['Id']
#print(s)
# print(s)
if "AutoUpdate" in s and s["AutoUpdate"] is not None:
if type(s["AutoUpdate"]) == dict and "Webhook" in s["AutoUpdate"]:
#print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
#print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]:
# print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook'])
# print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW")
webhooks[s['EndpointId']][s['Name']] = s['AutoUpdate']['Webhook']
webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['AutoUpdate']['Webhook']
elif s["AutoUpdate"]["Webhook"] != "":
elif s["AutoUpdate"]["Webhook"] != "":
webhooks[s['EndpointId']][s['Name']] = s['Webhook']
webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['Webhook']
#print(self.stacks_all)
# print(self.stacks_all)
if s['EndpointId'] == endpoint_id or endpoint_id == "all":
stcks.append(s)
#print(stcks)
stcks.append(s)
# print(stcks)
if stcks is None:
return []
self.stacks = stacks
self.all_data["stacks"] = self.stacks_all
self.all_data["webhooks"] = webhooks
#input(json.dumps(self.stacks_all,indent=2))
# input(json.dumps(self.stacks_all,indent=2))
return stcks
def get_stack_id(self,endpoint,stack):
def get_stack_id(self, endpoint, stack):
pass
def update_status(self,endpoint,stack):
def update_status(self, endpoint, stack):
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
#input(path)
# input(path)
stats = self.api_get(path)
print(stats)
def get_endpoint_id(self,endpoint):
def get_endpoint_id(self, endpoint):
if self.is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
@@ -237,8 +240,8 @@ class Portainer:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return self.endpoints["by_name"][endpoint]
def get_endpoint_name(self,endpoint):
def get_endpoint_name(self, endpoint):
if self.is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
@@ -247,74 +250,77 @@ class Portainer:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return endpoint
def get_containers(self, endpoint="all", stack="all", timeout=30):
#print(json.dumps(self.all_data,indent=2))
#print(endpoint)
#print(stack)
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
if endpoint == "all":
for s in self.all_data["endpoints"]["by_id"]:
#print(s)
# print(s)
if stack == "all":
if not s in self.all_data["stacks"]:
if s not in self.all_data["stacks"]:
continue
if self.all_data["endpoints_status"][s] != 1:
#print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
for e in self.all_data["stacks"][s]["by_name"]:
path = f"/endpoints/{s}/docker/containers/json?all=1&filters={{\"label\": [\"com.docker.compose.project={e}\"]}}"
path = (
f"/endpoints/{s}/docker/containers/json"
f"?all=1&filters={{\"label\": [\"com.docker.compose.project={e}\"]}}"
)
logging.info(f"request : {path}")
try:
containers = self.api_get(path)
except:
print(f"failed to get containers from {path}")
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
cont.append(c["Names"][0].replace("/",""))
contr.append(c["Names"][0].replace("/",""))
cont.append(c["Names"][0].replace("/", ""))
contr.append(c["Names"][0].replace("/", ""))
if self.all_data["endpoints"]["by_id"][s] in data:
data[self.all_data["endpoints"]["by_id"][s]][e] = contr
else:
data[self.all_data["endpoints"]["by_id"][s]] = {e:contr}
except:
print("lalalal")
#print(data)
data[self.all_data["endpoints"]["by_id"][s]] = {e: contr}
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}"
)
# print(data)
self.all_data["containers"] = data
else:
self.get_containers()
for i in self.all_data["containers"][endpoint][stack]:
cont.append(i)
return cont
def stop_containers(self, endpoint, containers, timeout=130):
if self.all_data["endpoints_status"][endpoint] != 1:
print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline")
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Stopping {c}")
self.api_post_no_body(
f"/endpoints/{ep_id}/docker/containers/{c}/stop"
)
#print(f"✔")
# print(f"✔")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
# for c in containers:
# print(f" > Stopping {c}")
# self.api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
# return 0
def start_containers(self, endpoint, containers, timeout=130):
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
print(f" > Starting {c}")
self.api_post_no_body(
@@ -322,42 +328,44 @@ class Portainer:
)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
def update_stack(self, endpoint, stack, autostart, timeout=130):
stcs = []
if stack == "all":
for s in self.all_data["webhooks"][endpoint]:
stcs.append([s,self.all_data["webhooks"][endpoint][s]])
stcs.append([s, self.all_data["webhooks"][endpoint][s]])
else:
stcs.append([stack, self.all_data["webhooks"][endpoint][stack]])
#input(stcs)
# input(stcs)
def update(c):
print(f" > Updating {c[0]} on {endpoint}")
ans = self.api_post_no_body(
f"/stacks/webhooks/{c[1]}"
)
logger.debug(f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}")
def stop():
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack== "all":
cont+=self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint,cont)
if stack == c or stack == "all":
cont += self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint, cont)
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(update, stcs)
if not autostart:
time.sleep(120)
cont = []
for c in self.all_data["containers"][endpoint]:
if stack == c or stack== "all":
cont+=self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint,cont)
if stack == c or stack == "all":
cont += self.all_data["containers"][endpoint][c]
self.stop_containers(endpoint, cont)
def get_endpoints(self, timeout=10):
endpoints = self.api_get("/endpoints")
eps = {"by_id":{}, "by_name":{}}
eps = {"by_id": {}, "by_name": {}}
eps_stats = {}
for ep in endpoints:
eps['by_id'][ep['Id']] = ep['Name']
@@ -367,47 +375,60 @@ class Portainer:
self.endpoints = eps
self.all_data["endpoints"] = eps
self.all_data["endpoints_status"] = eps_stats
#input(eps_stats)
#input(eps)
# input(eps_stats)
# input(eps)
return eps
def get_endpoint(self, endpoint_id=None, timeout=30):
self.get_endpoints()
#print(self.endpoints)
# print(self.endpoints)
if self.is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
return self.endpoint_id
def get_swarm_id(self, endpoint):
ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info"
stats = self.api_get(path)
return stats['Swarm']['Cluster']['ID']
def get_stack(self, stack=None, endpoint_id=None, timeout=None):
self.get_stacks(endpoint_id)
if not self.is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
self.stack_id = []
if stack == "all":
for s in self.stacks:
#print(s)
# print(s)
if (endpoint_id == s.get("EndpointId")):
self.stack_ids.append(s.get("Id"))
return self.stack_ids
else:
return self.stack_ids
else:
for s in self.stacks:
#print(s)
if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")):
# print(s)
match_by_id = (
stack is not None
and s.get("Id") == stack
and endpoint_id == s.get("EndpointId")
)
match_by_name = (
str(s.get("Name")) == str(stack)
and endpoint_id == int(s.get("EndpointId")) # Ensure types match for comparison
)
if match_by_id or match_by_name:
# if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId"))
# or str(s.get("Name")) == str(stack) and endpoint_id == int(s.get("EndpointId")):
self.stack_id = s.get("Id")
self.stack_name = s.get("Name")
self.stack_ids.append(s.get("Id"))
return s
return s
raise ValueError(f"Stack not found: {stack}")
def create_stack(self, endpoint, stack=None, mode="git", autostart=False, swarm=False, timeout=None):
@@ -418,7 +439,7 @@ class Portainer:
else:
p = "standalone"
env_path = f"{self.repo_dir}/{stack}/.env"
#input(swarm_id)
# input(swarm_id)
self.endpoint_id = self.get_endpoint_id(endpoint)
if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir)
@@ -427,11 +448,9 @@ class Portainer:
Repo.clone_from(self.git_url, self.repo_dir)
if mode == "git":
path = f"/stacks/create/{p}/repository"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
@@ -443,52 +462,53 @@ class Portainer:
stacks = self.nas_stacks
else:
stacks = [stack]
#print(json.dumps(self.stacks_all, indent=2))
#input(json.dumps(self.stacks_all,indent=2))
# print(json.dumps(self.stacks_all, indent=2))
# input(json.dumps(self.stacks_all, indent=2))
for stack in stacks:
if self.endpoint_id in self.stacks_all:
if stack in self.stacks_all[self.endpoint_id]['by_id'] or stack in self.stacks_all[self.endpoint_id]['by_name']:
# Check if the stack exists by ID or name
stack_check = (
stack in self.stacks_all[self.endpoint_id]['by_id']
or stack in self.stacks_all[self.endpoint_id]['by_name']
)
if stack_check:
print(f"Stack {stack} already exist")
continue
print(f"Working on {stack}")
envs = []
if os.path.exists(f"{env_path}"):
f = open(f"{env_path}","r")
f = open(f"{env_path}", "r")
env_vars = f.read().splitlines()
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=",1)
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
#wl(envs)
# wl(envs)
for e in envs:
#print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE","HW_MODE1","HW_MODE2"]
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e['name'] == "RESTART" and self.endpoint_name == "m-server":
e['value'] = "always"
if e['name'] in HWS:
#print("Found HW_MODE env var.")
# print("Found HW_MODE env var.")
if self.hw_mode:
e['value'] = "hw"
else:
e['value'] = "cpu"
if e['name'] == "LOGGING":
#print("Found LOGGING env var.")
if self.log_mode:
e['value'] = "journald"
else:
e['value'] = "syslog"
# print("Found LOGGING env var.")
if self.log_mode:
e['value'] = "journald"
else:
e['value'] = "syslog"
uid = uuid.uuid4()
#print(uid)
uid = uuid.uuid4()
# print(uid)
req = {
"Name": stack,
"Env": envs,
@@ -510,7 +530,7 @@ class Portainer:
"supportRelativePath": True,
"repositoryAuthentication": True,
"fromAppTemplate": False,
"registries": [6,3],
"registries": [6, 3],
"FromAppTemplate": False,
"Namespace": "",
"CreatedByUserId": "",
@@ -518,7 +538,7 @@ class Portainer:
"filesystemPath": "/share/docker_data/portainer/portainer-data/",
"RegistryID": 4,
"isDetachedFromGit": True,
"method":"repository",
"method": "repository",
"swarmID": None
}
if swarm:
@@ -526,11 +546,11 @@ class Portainer:
req["swarmID"] = swarm_id
req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml"
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
print(json.dumps(req))
res = self.api_post(path,req)
res = self.api_post(path, req)
if "Id" in res:
#print("Deploy request OK")
# print("Deploy request OK")
pass
else:
print(res)
@@ -538,9 +558,9 @@ class Portainer:
created = False
while True:
try:
#print(self.endpoint_id)
#print(stack)
stck2 = self.get_stack(stack, self.endpoint_id)
# print(self.endpoint_id)
# print(stack)
self.get_stack(stack, self.endpoint_id)
created = True
break
except Exception as e:
@@ -550,23 +570,24 @@ class Portainer:
if tries > 50:
print(f"Error retrieving stack {stack} after creation: {self.endpoint_name}")
break
logger.debug(f"Exception while getting stack {stack}: {e}")
if created:
if stack != "pihole":
#print(autostart)
# print(autostart)
if not autostart:
#self.get_stacks()
#self.stop_stack(stack,self.endpoint_id)
conts = self.get_containers(self.endpoint_name,stack)
#print(conts)
self.stop_containers(self.endpoint_name,conts)
# self.get_stacks()
# self.stop_stack(stack,self.endpoint_id)
conts = self.get_containers(self.endpoint_name, stack)
# print(conts)
self.stop_containers(self.endpoint_name, conts)
if mode == "file":
print("Creating new stack from file...")
path = "/stacks/create/standalone/file"
path = "/stacks/create/standalone/file"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
if stack == "all":
if self.endpoint_name == "rack":
stacks = self.rack_stacks
@@ -575,25 +596,25 @@ class Portainer:
elif self.endpoint_name == "rpi5":
stacks = self.rpi5_stacks
else:
stacks = [stack]
stacks = [stack]
for stack in stacks:
print(f"Working on {stack}")
if os.path.exists(f"{self.repo_dir}/{stack}/.env"):
f = open(f"{self.repo_dir}/{stack}/.env","r")
f = open(f"{self.repo_dir}/{stack}/.env", "r")
env_vars = f.read().splitlines()
envs = []
for ev in env_vars:
if ev.startswith("#") or ev.strip() == "":
continue
if "=" in ev:
name, value = ev.split("=",1)
name, value = ev.split("=", 1)
envs.append({"name": name, "value": value})
f.close()
#wl(envs)
# wl(envs)
for e in envs:
#print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE","HW_MODE1","HW_MODE2"]
# print(f"Env: {e['name']} = {e['value']}")
HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"]
if e['name'] == "RESTART" and self.endpoint_name == "m-server":
e['value'] = "always"
if e['name'] in HWS:
@@ -603,51 +624,51 @@ class Portainer:
else:
e['value'] = "cpu"
if e['name'] == "LOGGING":
print("Found LOGGING env var.")
if self.log_mode:
e['value'] = "journald"
else:
e['value'] = "syslog"
print("Found LOGGING env var.")
if self.log_mode:
e['value'] = "journald"
else:
e['value'] = "syslog"
file = {
# ("filename", file_object)
"file": ("docker-compose.yml", open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb")),
}
self.api_post_file(path,self.endpoint_id,stack,envs,file)
self.api_post_file(path, self.endpoint_id, stack, envs, file)
def print_stacks(self,endpoint="all"):
def print_stacks(self, endpoint="all"):
stacks = self.get_stacks()
count = 0
lst = []
data = []
for stack in stacks:
if endpoint != None:
if not stack['EndpointId'] in self.endpoints['by_id']:
if endpoint is not None:
if not stack['EndpointId'] in self.endpoints['by_id']:
continue
if endpoint != "all":
if self.endpoints['by_name'][endpoint] != stack['EndpointId']:
if self.endpoints['by_name'][endpoint] != stack['EndpointId']:
continue
try:
data.append([stack['Id'], stack['Name'], self.endpoints['by_id'][stack['EndpointId']]])
except KeyError as e:
data.append([stack['Id'], stack['Name'], "?"])
logger.debug(f"KeyError getting endpoint name for stack {stack['Name']}: {e}")
count += 1
headers = ["StackID", "Name", "Endpoint"]
print(tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}")
def start_stack(self,stack=None,endpoint_id=None):
if endpoint_id != None:
def start_stack(self, stack=None, endpoint_id=None):
if endpoint_id is not None:
print("Getting endpoint")
self.get_endpoint(endpoint_id)
if stack != None:
self.get_stack(stack,endpoint_id)
if stack is not None:
self.get_stack(stack, endpoint_id)
for stack in self.stack_ids:
path = f"/stacks/{stack}/start"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
try:
resp = self.api_post_no_body(path, timeout=20)
except Exception as e:
print(f"Error stoping stack: {e}")
@@ -657,24 +678,24 @@ class Portainer:
else:
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}")
return True
def stop_stack(self,stack,endpoint_id):
def stop_stack(self, stack, endpoint_id):
print(f"Stopping stack {stack}")
if endpoint_id != None:
if endpoint_id is not None:
self.get_endpoint(endpoint_id)
if stack == "all":
self.get_stack(stack,endpoint_id)
self.get_stack(stack, endpoint_id)
else:
if stack != None:
self.stack_ids = [self.get_stack(stack,endpoint_id)["Id"]]
if stack is not None:
self.stack_ids = [self.get_stack(stack, endpoint_id)["Id"]]
for stack in self.stack_ids:
path = f"/stacks/{stack}/stop"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self.api_post_no_body(path, timeout=120)
except NameError as e:
print(f"Error stoping stack: {e}")
print(f"Error stopping stack: {e}")
return []
if "Id" in json.loads(resp):
print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped")
@@ -692,37 +713,38 @@ class Portainer:
self.endpoint_id = endpoint_id
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
if not self.is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
if not self.is_number(stack) and stack != "all":
#print(stack)
#print(self.endpoint_id)
stack = self.get_stack(stack,self.endpoint_id)["Id"]
# print(stack)
# print(self.endpoint_id)
stack = self.get_stack(stack, self.endpoint_id)["Id"]
if stack == "all":
stacks = self.get_stacks(self.endpoint_id)
paths = []
for s in stacks:
#print(f"Delete stack {s['Name']}")
#print(s['EndpointId'], endpoint_id)
# print(f"Delete stack {s['Name']}")
# print(s['EndpointId'], endpoint_id)
if int(s['EndpointId']) != int(endpoint_id):
continue
#print("Deleting stack:", s['Name'])
# print("Deleting stack:", s['Name'])
path = f"/stacks/{s['Id']}"
if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true"
paths.append([self.get_endpoint_name(endpoint_id),s['Name'],path])
#input(paths)
paths.append([self.get_endpoint_name(endpoint_id), s['Name'], path])
# input(paths)
def delete(c):
print(f"Delete stack {c[1]} from {c[0]} ")
out = self.api_delete(c[2])
logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(delete, paths)
return "Done"
return "Done"
else:
path = f"/stacks/{stack}"
@@ -735,16 +757,16 @@ class Portainer:
# print(token)
stacks = self.api_delete(path)
except Exception as e:
#print(f"Error creating stack: {e}")
# print(f"Error creating stack: {e}")
if "Conflict for url" in str(e):
print("Stack with this name may already exist.")
else:
print(f"Error deleting stack: {e}")
#print(stacks)
# print(stacks)
return []
if stacks is None:
return []
return stacks
def refresh_status(self, stack, timeout=None):
@@ -761,4 +783,4 @@ class Portainer:
"Name": name,
"Data": encoded
}
self.api_post(path, data, timeout=timeout)
self.api_post(path, data, timeout=timeout)

View File

@@ -3,15 +3,11 @@ import os
import sys
import requests
import json
import uuid
import argparse
import shutil
import time
from tabulate import tabulate
from git import Repo # pip install gitpython
from port import Portainer
import logging
VERSION="0.0.1"
VERSION = "0.0.1"
defaults = {
"endpoint_id": "vm01",
@@ -24,18 +20,21 @@ defaults = {
parser = argparse.ArgumentParser(description="Portainer helper - use env vars or pass credentials.")
parser.add_argument("--base", "-b", default=os.getenv("PORTAINER_URL", \
"https://portainer.example.com"),help="Base URL for Portainer (ENV: PORTAINER_URL)")
parser.add_argument("--base",
"-b",
default=os.getenv("PORTAINER_URL", "https://portainer.example.com"),
help="Base URL for Portainer (ENV: PORTAINER_URL)"
)
parser.add_argument("--site", "-t", type=str, default=None, help="Site")
parser.add_argument("--endpoint-id", "-e", type=str, default=None, help="Endpoint ID to limit stack operations")
parser.add_argument("--refresh-environment", "-R", action="store_true", help="List endpoints")
parser.add_argument("--list-endpoints","-E", action="store_true", help="List endpoints")
parser.add_argument("--list-endpoints", "-E", action="store_true", help="List endpoints")
parser.add_argument("--list-stacks", "-l", action="store_true", help="List stacks")
parser.add_argument("--print-all-data", "-A", action="store_true", help="List stacks")
parser.add_argument("--list-containers", "-c", action="store_true", help="List containers")
parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacls")
parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacks")
parser.add_argument("--stop-containers", "-O", action="store_true", help="Stop containers")
parser.add_argument("--start-containers", "-X", action="store_true", help="Stop containers")
parser.add_argument("--start-containers", "-X", action="store_true", help="Start containers")
parser.add_argument("--update-status", "-S", action="store_true", help="Update status")
parser.add_argument("--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id")
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
@@ -44,17 +43,17 @@ parser.add_argument("--start-stack", "-x", action='store_true')
parser.add_argument("--stop-stack", "-o", action='store_true')
parser.add_argument("--secrets", "-q", action='store_true')
parser.add_argument("--debug", "-D", action='store_true')
parser.add_argument("--create-stack","-n", action='store_true')
parser.add_argument("--create-stack_new2","-N", action='store_true')
parser.add_argument("--gpu","-g", action='store_true')
parser.add_argument("--create-stacks","-C", action='store_true')
parser.add_argument("--refresh-status","-r", action='store_true')
parser.add_argument("--create-stack", "-n", action='store_true')
parser.add_argument("--create-stack_new2", "-N", action='store_true')
parser.add_argument("--gpu", "-g", action='store_true')
parser.add_argument("--create-stacks", "-C", action='store_true')
parser.add_argument("--refresh-status", "-r", action='store_true')
parser.add_argument("--stack", "-s", type=str, help="Stack ID for operations")
parser.add_argument("--token-only", action="store_true", help="Print auth token and exit")
parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds")
parser.add_argument("--deploy-mode","-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode","-w", default=None, help="Stack mode")
parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode")
args = parser.parse_args()
print("Running version:", VERSION)
print("Environment:", args.site)
@@ -62,19 +61,39 @@ print("Environment:", args.site)
_LOG_LEVEL = "INFO"
LOG_FILE = "/tmp/portainer.log"
if _LOG_LEVEL == "DEBUG":
logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logging.debug('using debug loging')
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format='%(asctime)s : %(levelname)s : %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p'
)
logging.debug('using debug logging')
elif _LOG_LEVEL == "ERROR":
logging.basicConfig(filename=LOG_FILE, level=logging.ERROR, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logging.info('using error loging')
logging.basicConfig(
filename=LOG_FILE,
level=logging.ERROR,
format='%(asctime)s : %(levelname)s : %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p'
)
logging.info('using error logging')
elif _LOG_LEVEL == "SCAN":
logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logging.info('using error loging')
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format='%(asctime)s : %(levelname)s : %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p'
)
logging.info('using scan logging')
else:
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logging.info("script started")
logger = logging.getLogger(__name__)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s : %(levelname)s : %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p'
)
logging.info("script started")
logger = logging.getLogger(__name__)
if args.site == "portainer":
base = os.getenv("PORTAINER_URL", "https://portainer.sectorq.eu/api")
@@ -82,10 +101,13 @@ if args.site == "portainer":
else:
base = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api")
portainer_api_key = "ptr_/5RkMCT/j3BTaL32vMSDtXFi76yOXRKVFOrUtzMsl5Y="
def wl(msg):
if args.debug:
print(msg)
def is_number(s):
"""Check if the input string is a number."""
try:
@@ -95,7 +117,6 @@ def is_number(s):
return False
def get_portainer_token(base_url, username=None, password=None, timeout=10):
"""
Authenticate to Portainer and return a JWT token.
@@ -114,6 +135,8 @@ def get_portainer_token(base_url, username=None, password=None, timeout=10):
if not token:
raise ValueError(f"No token found in response: {data}")
return token
def prompt_missing_args(args, defaults, fields):
"""
fields = [("arg_name", "Prompt text")]
@@ -132,11 +155,27 @@ def prompt_missing_args(args, defaults, fields):
setattr(args, field, value)
return args
if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
#token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret")
if args.action == None:
actions = ["delete_stack","create_stack","stop_stack","start_stack","list_stacks","update_stack","secrets","print_all_data","list_endpoints","list_containers","stop_containers","start_containers","refresh_environment","refresh_status","update_status"]
# token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret")
if args.action is None:
actions = ["delete_stack",
"create_stack",
"stop_stack",
"start_stack",
"list_stacks",
"update_stack",
"secrets",
"print_all_data",
"list_endpoints",
"list_containers",
"stop_containers",
"start_containers",
"refresh_environment",
"refresh_status",
"update_status"]
print("Possible actions: ")
i = 1
for a in actions:
@@ -144,15 +183,15 @@ if __name__ == "__main__":
i += 1
ans = input("\nSelect action to perform: ")
args.action = actions[int(ans)-1]
token = portainer_api_key
# Example: list endpoints
por = Portainer(base, token)
if args.action == "secrets":
if args.endpoint_id == None:
args.endpoint_id = input("Endpoint ID is required for creating secrets : ")
if args.endpoint_id is None:
args.endpoint_id = input("Endpoint ID is required for creating secrets : ")
secrets = {
"gitea_runner_registration_token": "8nmKqJhkvYwltmNfF2o9vs0tzo70ufHSQpVg6ymb",
"influxdb2-admin-token": "l4c1j4yd33Du5lo",
@@ -182,42 +221,40 @@ if __name__ == "__main__":
("stack_mode", "Stack mode (swarm or compose)"),
("deploy_mode", "Deploy mode (git or upload)")
])
por.create_stack(args.endpoint_id,args.stack, args.deploy_mode, args.autostart, args.stack_mode)
por.create_stack(args.endpoint_id, args.stack, args.deploy_mode, args.autostart, args.stack_mode)
sys.exit()
if args.action == "stop_stack":
if args.endpoint_id == None:
args.endpoint_id = input("Endpoint ID is required for stopping stacks : ")
if args.stack == None:
args.stack = input("Stack name or ID is required for stopping stacks : ")
por.stop_stack(args.stack,args.endpoint_id)
if args.endpoint_id is None:
args.endpoint_id = input("Endpoint ID is required for stopping stacks : ")
if args.stack is None:
args.stack = input("Stack name or ID is required for stopping stacks : ")
por.stop_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "start_stack":
if args.endpoint_id == None:
args.endpoint_id = input("Endpoint ID is required for starting stacks : ")
if args.stack == None:
args.stack = input("Stack name or ID is required for starting stacks : ")
if args.endpoint_id is None:
args.endpoint_id = input("Endpoint ID is required for starting stacks : ")
if args.stack is None:
args.stack = input("Stack name or ID is required for starting stacks : ")
por.start_stack(args.stack, args.endpoint_id)
sys.exit()
if args.action == "list_stacks":
por.print_stacks(args.endpoint_id)
print(json.dumps(por.all_data,indent=2))
print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "list_containers":
print("Getting containers")
por.get_containers(args.endpoint_id,args.stack)
por.get_containers(args.endpoint_id, args.stack)
sys.exit()
if args.action == "update_stack":
print("Updating stacks")
autostart=True if args.autostart else False
por.update_stack(args.endpoint_id,args.stack,autostart)
sys.exit()
autostart = True if args.autostart else False
por.update_stack(args.endpoint_id, args.stack, autostart)
sys.exit()
if args.action == "print_all_data":
print(json.dumps(por.all_data, indent=2))
sys.exit()