This commit is contained in:
2025-12-04 13:21:20 +01:00
parent 48e8b36646
commit 5fb1e8ade4
2 changed files with 170 additions and 111 deletions

267
port.py
View File

@@ -1,5 +1,5 @@
import os
import requests
from concurrent.futures import ThreadPoolExecutor
import json
import uuid
import shutil
@@ -8,7 +8,8 @@ import logging
import base64
import tabulate
from git import Repo
from concurrent.futures import ThreadPoolExecutor
import requests
logger = logging.getLogger(__name__)
@@ -105,7 +106,7 @@ class Portainer:
self.get_stacks()
self.get_containers()
def is_number(self, s):
def _is_number(self, s):
"""Check if the input string is a number."""
try:
float(s)
@@ -113,14 +114,14 @@ class Portainer:
except ValueError:
return False
def api_get(self, path, timeout=120):
def _api_get(self, path, timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
def api_post(self, path, json="", timeout=120):
def _api_post(self, path, json="", timeout=120):
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
# print(url)
@@ -128,7 +129,7 @@ class Portainer:
resp = requests.post(url, headers=headers, json=json, timeout=timeout)
return resp.text
def api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
def _api_post_file(self, path, endpoint_id, name, envs, file, timeout=120):
# input("API POST2 called. Press Enter to continue.")
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
@@ -141,7 +142,7 @@ class Portainer:
resp.raise_for_status()
return resp.json()
def api_post_no_body(self, path, timeout=120):
def _api_post_no_body(self, path, timeout=120):
"""Example authenticated GET request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
# print(url)
@@ -149,7 +150,7 @@ class Portainer:
resp = requests.post(url, headers=headers, timeout=timeout)
return resp.text
def api_delete(self, path, timeout=120):
def _api_delete(self, path, timeout=120):
"""Example authenticated DELETE request to Portainer API."""
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
@@ -170,7 +171,7 @@ class Portainer:
endpoint_id = self.get_endpoint_id(endpoint_id)
path = "/stacks"
stcks = []
stacks = self.api_get(path, timeout=timeout)
stacks = self._api_get(path, timeout=timeout)
self.stacks_all = {}
fail_endponts = [20, 39, 41]
# print(json.dumps(stacks,indent=2))
@@ -231,17 +232,14 @@ class Portainer:
# input(json.dumps(self.stacks_all,indent=2))
return stcks
def get_stack_id(self, endpoint, stack):
pass
def update_status(self, endpoint, stack):
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
# input(path)
stats = self.api_get(path)
stats = self._api_get(path)
print(stats)
def get_endpoint_id(self, endpoint):
if self.is_number(endpoint):
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return endpoint
@@ -251,7 +249,7 @@ class Portainer:
return self.endpoints["by_name"][endpoint]
def get_endpoint_name(self, endpoint):
if self.is_number(endpoint):
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return self.endpoints["by_id"][endpoint]
@@ -282,7 +280,7 @@ class Portainer:
)
logging.info(f"request : {path}")
try:
containers = self.api_get(path)
containers = self._api_get(path)
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
@@ -318,14 +316,14 @@ class Portainer:
def stop(c):
print(f" > Stopping {c}")
self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop")
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop")
# print(f"✔")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
# for c in containers:
# print(f" > Stopping {c}")
# self.api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
# self._api_post_no_body(f"/endpoints/{self.endpoints["by_name"][endpoint]}/docker/containers/{c}/stop")
# return 0
def start_containers(self, endpoint, containers, timeout=130):
@@ -333,7 +331,7 @@ class Portainer:
def stop(c):
print(f" > Starting {c}")
self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start")
self._api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start")
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(stop, containers)
@@ -349,7 +347,7 @@ class Portainer:
# input(stcs)
def update(c):
print(f" > Updating {c[0]} on {endpoint}")
ans = self.api_post_no_body(f"/stacks/webhooks/{c[1]}")
ans = self._api_post_no_body(f"/stacks/webhooks/{c[1]}")
logger.debug(
f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}"
)
@@ -373,7 +371,7 @@ class Portainer:
self.stop_containers(endpoint, cont)
def get_endpoints(self, timeout=10):
endpoints = self.api_get("/endpoints")
endpoints = self._api_get("/endpoints")
eps = {"by_id": {}, "by_name": {}}
eps_stats = {}
for ep in endpoints:
@@ -391,7 +389,7 @@ class Portainer:
def get_endpoint(self, endpoint_id=None, timeout=30):
self.get_endpoints()
# print(self.endpoints)
if self.is_number(endpoint_id):
if self._is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
else:
@@ -402,12 +400,12 @@ class Portainer:
def get_swarm_id(self, endpoint):
ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info"
stats = self.api_get(path)
stats = self._api_get(path)
return stats["Swarm"]["Cluster"]["ID"]
def get_stack(self, stack=None, endpoint_id=None, timeout=None):
self.get_stacks(endpoint_id)
if not self.is_number(endpoint_id):
if not self._is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
self.stack_id = []
if stack == "all":
@@ -446,7 +444,6 @@ class Portainer:
mode="git",
autostart=False,
swarm=False,
timeout=None,
):
if swarm:
swarm_id = self.get_swarm_id(endpoint)
@@ -564,7 +561,7 @@ class Portainer:
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
print(json.dumps(req))
res = self.api_post(path, req)
res = self._api_post(path, req)
if "Id" in res:
# print("Deploy request OK")
pass
@@ -658,9 +655,10 @@ class Portainer:
open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"),
),
}
self.api_post_file(path, self.endpoint_id, stack, envs, file)
self._api_post_file(path, self.endpoint_id, stack, envs, file)
def print_stacks(self, endpoint="all"):
'''Print a table of stacks, optionally filtered by endpoint.'''
stacks = self.get_stacks()
count = 0
data = []
@@ -682,7 +680,7 @@ class Portainer:
except KeyError as e:
data.append([stack["Id"], stack["Name"], "?"])
logger.debug(
f"KeyError getting endpoint name for stack {stack['Name']}: {e}"
"KeyError getting endpoint name for stack %s : %s", stack['Name'],e
)
count += 1
@@ -691,31 +689,33 @@ class Portainer:
print(f"Total stacks: {count}")
def start_stack(self, stack=None, endpoint_id=None):
'''Start one stack or all stacks on an endpoint.'''
if endpoint_id is not None:
print("Getting endpoint")
self.get_endpoint(endpoint_id)
if stack is not None:
self.get_stack(stack, endpoint_id)
for stack in self.stack_ids:
path = f"/stacks/{stack}/start"
for stck in self.stack_ids:
path = f"/stacks/{stck}/start"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self.api_post_no_body(path, timeout=20)
except Exception as e:
resp = self._api_post_no_body(path, timeout=20)
except ValueError as e:
print(f"Error stoping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started"
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : started"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}"
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
)
return True
def stop_stack(self, stack, endpoint_id):
'''Stop one stack or all stacks on an endpoint.'''
print(f"Stopping stack {stack}")
if endpoint_id is not None:
self.get_endpoint(endpoint_id)
@@ -724,100 +724,169 @@ class Portainer:
else:
if stack is not None:
self.stack_ids = [self.get_stack(stack, endpoint_id)["Id"]]
for stack in self.stack_ids:
for stck in self.stack_ids:
path = f"/stacks/{stack}/stop"
if self.endpoint_id is not None:
path += f"?endpointId={self.endpoint_id}"
try:
resp = self.api_post_no_body(path, timeout=120)
resp = self._api_post_no_body(path, timeout=120)
except NameError as e:
print(f"Error stopping stack: {e}")
return []
if "Id" in json.loads(resp):
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped"
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : stopped"
)
else:
print(
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}"
f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}"
)
return True
def delete_stack(self, endpoint_id=None, stack=None, timeout=None):
"""
Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
"""
def _resolve_endpoint(self, endpoint_id):
self.get_endpoints()
if self.is_number(endpoint_id):
self.endpoint_name = self.endpoints["by_id"][endpoint_id]
self.endpoint_id = endpoint_id
if self._is_number(endpoint_id):
self.endpoint_id = int(endpoint_id)
self.endpoint_name = self.endpoints["by_id"][self.endpoint_id]
else:
self.endpoint_name = endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint_id]
self.endpoint_id = int(self.endpoints["by_name"][endpoint_id])
if not self.is_number(endpoint_id):
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
if not self.is_number(stack) and stack != "all":
# print(stack)
# print(self.endpoint_id)
stack = self.get_stack(stack, self.endpoint_id)["Id"]
def _resolve_stack_id(self, stack, endpoint_id):
if stack == "all":
stacks = self.get_stacks(self.endpoint_id)
paths = []
for s in stacks:
# print(f"Delete stack {s['Name']}")
# print(s['EndpointId'], endpoint_id)
if int(s["EndpointId"]) != int(endpoint_id):
continue
# print("Deleting stack:", s['Name'])
path = f"/stacks/{s['Id']}"
if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true"
paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
# input(paths)
return "all"
def delete(c):
print(f"Delete stack {c[1]} from {c[0]} ")
out = self.api_delete(c[2])
logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}")
if not self._is_number(stack):
result = self.get_stack(stack, endpoint_id)
return result["Id"]
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(delete, paths)
return "Done"
else:
path = f"/stacks/{stack}"
return int(stack)
def _delete_all_stacks(self, endpoint_id):
stacks = self.get_stacks(endpoint_id)
paths = []
if endpoint_id is not None:
path += f"?endpointId={endpoint_id}&removeVolumes=true"
# print(path)
try:
# print(path)
# print(base_url)
# print(token)
stacks = self.api_delete(path)
except Exception as e:
# print(f"Error creating stack: {e}")
if "Conflict for url" in str(e):
print("Stack with this name may already exist.")
else:
print(f"Error deleting stack: {e}")
# print(stacks)
return []
if stacks is None:
return []
for s in stacks:
if int(s["EndpointId"]) != int(endpoint_id):
continue
return stacks
path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true"
paths.append([
self.get_endpoint_name(endpoint_id),
s["Name"],
path
])
def refresh_status(self, stack, timeout=None):
pass
def delete_item(item):
print(f"Delete stack {item[1]} from {item[0]}")
out = self._api_delete(item[2])
logger.debug("Deleted stack %s from %s: %s", item[1], item[0], out)
def __repr__(self):
pass
with ThreadPoolExecutor(max_workers=10) as exe:
exe.map(delete_item, paths)
return "Done"
def _delete_single_stack(self, stack_id, endpoint_id):
path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true"
try:
out = self._api_delete(path)
except ValueError as e:
msg = str(e)
if "Conflict for url" in msg:
print("Stack with this name may already exist.")
else:
print(f"Error deleting stack: {e}")
return []
return out or []
def delete_stack(self, endpoint_id=None, stack=None):
"""Delete one stack or all stacks on an endpoint."""
self._resolve_endpoint(endpoint_id)
endpoint_id = self.endpoint_id
stack_id = self._resolve_stack_id(stack, endpoint_id)
if stack == "all":
return self._delete_all_stacks(endpoint_id)
return self._delete_single_stack(stack_id, endpoint_id)
# def delete_stack(self, endpoint_id=None, stack=None):
# """
# Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
# """
# self.get_endpoints()
# if self._is_number(endpoint_id):
# self.endpoint_name = self.endpoints["by_id"][endpoint_id]
# self.endpoint_id = endpoint_id
# else:
# self.endpoint_name = endpoint_id
# self.endpoint_id = self.endpoints["by_name"][endpoint_id]
# if not self._is_number(endpoint_id):
# endpoint_id = int(self.endpoints["by_name"][endpoint_id])
# if not self._is_number(stack) and stack != "all":
# # print(stack)
# # print(self.endpoint_id)
# stack = self.get_stack(stack, self.endpoint_id)["Id"]
# if stack == "all":
# stacks = self.get_stacks(self.endpoint_id)
# paths = []
# for s in stacks:
# # print(f"Delete stack {s['Name']}")
# # print(s['EndpointId'], endpoint_id)
# if int(s["EndpointId"]) != int(endpoint_id):
# continue
# # print("Deleting stack:", s['Name'])
# path = f"/stacks/{s['Id']}"
# if endpoint_id is not None:
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
# paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
# # input(paths)
# def delete(c):
# print(f"Delete stack {c[1]} from {c[0]} ")
# out = self._api_delete(c[2])
# logger.debug(f"Deleted stack {c[1]} from {c[0]}: {out}")
# with ThreadPoolExecutor(max_workers=10) as exe:
# exe.map(delete, paths)
# return "Done"
# else:
# path = f"/stacks/{stack}"
# if endpoint_id is not None:
# path += f"?endpointId={endpoint_id}&removeVolumes=true"
# # print(path)
# try:
# # print(path)
# # print(base_url)
# # print(token)
# stacks = self._api_delete(path)
# except Exception as e:
# # print(f"Error creating stack: {e}")
# if "Conflict for url" in str(e):
# print("Stack with this name may already exist.")
# else:
# print(f"Error deleting stack: {e}")
# # print(stacks)
# return []
# if stacks is None:
# return []
# return stacks
def create_secret(self, name, value, endpoint_id=None, timeout=None):
'''Create a Docker secret on the specified endpoint.'''
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
path = f"/endpoints/{endpoint_id}/docker/secrets/create"
encoded = base64.b64encode(value.encode()).decode()
data = {"Name": name, "Data": encoded}
return self.api_post(path, data, timeout=timeout)
return self._api_post(path, data, timeout=timeout)

View File

@@ -141,16 +141,6 @@ def wl(msg):
if args.debug:
print(msg)
def is_number(s):
"""Check if the input string is a number."""
try:
float(s)
return True
except ValueError:
return False
def prompt_missing_args(args_in, defaults_in, fields):
"""
fields = [("arg_name", "Prompt text")]