This commit is contained in:
2025-12-04 22:02:58 +01:00
parent f8b0acdfcc
commit ee01577a6c
2 changed files with 135 additions and 43 deletions

57
port.py
View File

@@ -1,3 +1,5 @@
'''Portainer API wrapper module.'''
import os
from concurrent.futures import ThreadPoolExecutor
import json
@@ -12,7 +14,6 @@ from git import Repo
import requests
logger = logging.getLogger(__name__)
@@ -109,17 +110,22 @@ class Portainer:
self.get_stacks()
self.get_containers()
def get_site(self, site):
if site == "portainer":
self.base_url = os.getenv("PORTAINER_URL", "https://portainer.sectorq.eu/api")
self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
)
self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
elif site == "port":
self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api")
self.token = "ptr_/5RkMCT/j3BTaL32vMSDtXFi76yOXRKVFOrUtzMsl5Y="
else:
self.base_url = os.getenv("PORTAINER_URL", "https://portainer.sectorq.eu/api")
self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api"
)
self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
self.get_endpoints()
self.get_stacks()
def _is_number(self, s):
"""Check if the input string is a number."""
@@ -176,12 +182,14 @@ class Portainer:
return resp.status_code
def refresh(self):
'''Refresh all data from Portainer.'''
self.get_endpoints()
self.get_stacks(self)
self.get_containers(self)
return True
def get_stacks(self, endpoint_id="all", timeout=10):
'''Get a list of stacks for a specific endpoint or all endpoints.'''
if endpoint_id != "all":
endpoint_id = self.get_endpoint_id(endpoint_id)
path = "/stacks"
@@ -248,12 +256,14 @@ class Portainer:
return stcks
def update_status(self, endpoint, stack):
'''Get the update status of a specific stack on an endpoint.'''
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
# input(path)
stats = self._api_get(path)
print(stats)
def get_endpoint_id(self, endpoint):
'''Get endpoint ID from either ID or name input.'''
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
@@ -264,6 +274,7 @@ class Portainer:
return self.endpoints["by_name"][endpoint]
def get_endpoint_name(self, endpoint):
'''Get endpoint name from either ID or name input.'''
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
@@ -274,6 +285,7 @@ class Portainer:
return endpoint
def get_containers(self, endpoint="all", stack="all", timeout=30):
'''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
@@ -325,6 +337,7 @@ class Portainer:
return cont
def stop_containers(self, endpoint, containers, timeout=130):
'''Stop containers on an endpoint.'''
if self.all_data["endpoints_status"][endpoint] != 1:
print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline")
ep_id = self.endpoints["by_name"][endpoint]
@@ -342,6 +355,7 @@ class Portainer:
# return 0
def start_containers(self, endpoint, containers, timeout=130):
'''Start containers on an endpoint.'''
ep_id = self.endpoints["by_name"][endpoint]
def stop(c):
@@ -352,6 +366,7 @@ class Portainer:
exe.map(stop, containers)
def update_stack(self, endpoint, stack, autostart, timeout=130):
'''Update one stack or all stacks on an endpoint.'''
stcs = []
if stack == "all":
for s in self.all_data["webhooks"][endpoint]:
@@ -386,6 +401,7 @@ class Portainer:
self.stop_containers(endpoint, cont)
def get_endpoints(self, timeout=10):
'''Get a list of all endpoints.'''
endpoints = self._api_get("/endpoints")
eps = {"by_id": {}, "by_name": {}}
eps_stats = {}
@@ -395,6 +411,7 @@ class Portainer:
eps_stats[ep["Id"]] = ep["Status"]
eps_stats[ep["Name"]] = ep["Status"]
self.endpoints = eps
self.endpoints_names = list(eps["by_name"])
self.all_data["endpoints"] = eps
self.all_data["endpoints_status"] = eps_stats
# input(eps_stats)
@@ -402,6 +419,7 @@ class Portainer:
return eps
def get_endpoint(self, endpoint_id=None, timeout=30):
'''Get endpoint ID and name from either ID or name input.'''
self.get_endpoints()
# print(self.endpoints)
if self._is_number(endpoint_id):
@@ -413,6 +431,7 @@ class Portainer:
return self.endpoint_id
def get_swarm_id(self, endpoint):
'''Get the swarm ID for a specific endpoint.'''
ep_id = self.endpoints["by_name"][endpoint]
path = f"/endpoints/{ep_id}/docker/info"
stats = self._api_get(path)
@@ -459,9 +478,9 @@ class Portainer:
stack=None,
mode="git",
autostart=False,
stack_mode='swarm',
stack_mode="swarm",
):
if stack_mode == 'swarm':
if stack_mode == "swarm":
swarm_id = self.get_swarm_id(endpoint)
p = "swarm"
env_path = f"{self.repo_dir}/__swarm/{stack}/.env"
@@ -505,7 +524,7 @@ class Portainer:
if stack_check:
print(f"Stack {stack} already exist")
continue
print(f"Working on {stack}")
print(f"Working on {stack} , stack mode: {stack_mode}")
envs = []
if os.path.exists(f"{env_path}"):
@@ -571,12 +590,12 @@ class Portainer:
"method": "repository",
"swarmID": None,
}
if stack_mode == 'swarm':
if stack_mode == "swarm":
req["type"] = "swarm"
req["swarmID"] = swarm_id
req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml"
req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml"
if self._debug:
print(json.dumps(req))
res = self._api_post(path, req)
if "Id" in res:
@@ -675,7 +694,7 @@ class Portainer:
self._api_post_file(path, self.endpoint_id, stack, envs, file)
def print_stacks(self, endpoint="all"):
'''Print a table of stacks, optionally filtered by endpoint.'''
"""Print a table of stacks, optionally filtered by endpoint."""
stacks = self.get_stacks()
count = 0
data = []
@@ -697,7 +716,7 @@ class Portainer:
except KeyError as e:
data.append([stack["Id"], stack["Name"], "?"])
logger.debug(
"KeyError getting endpoint name for stack %s : %s", stack['Name'],e
"KeyError getting endpoint name for stack %s : %s", stack["Name"], e
)
count += 1
@@ -706,7 +725,7 @@ class Portainer:
print(f"Total stacks: {count}")
def start_stack(self, stack=None, endpoint_id=None):
'''Start one stack or all stacks on an endpoint.'''
"""Start one stack or all stacks on an endpoint."""
if endpoint_id is not None:
print("Getting endpoint")
self.get_endpoint(endpoint_id)
@@ -732,7 +751,7 @@ class Portainer:
return True
def stop_stack(self, stack, endpoint_id):
'''Stop one stack or all stacks on an endpoint.'''
"""Stop one stack or all stacks on an endpoint."""
print(f"Stopping stack {stack}")
if endpoint_id is not None:
self.get_endpoint(endpoint_id)
@@ -762,7 +781,6 @@ class Portainer:
)
return True
def _resolve_endpoint(self, endpoint_id):
self.get_endpoints()
@@ -785,6 +803,7 @@ class Portainer:
return result["Id"]
return int(stack)
def _delete_all_stacks(self, endpoint_id):
stacks = self.get_stacks(endpoint_id)
paths = []
@@ -794,11 +813,7 @@ class Portainer:
continue
path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true"
paths.append([
self.get_endpoint_name(endpoint_id),
s["Name"],
path
])
paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path])
def delete_item(item):
print(f"Delete stack {item[1]} from {item[0]}")
@@ -837,8 +852,6 @@ class Portainer:
return self._delete_single_stack(stack_id, endpoint_id)
# def delete_stack(self, endpoint_id=None, stack=None):
# """
# Return a list of stacks. If endpoint_id is provided, it will be added as a query param.
@@ -906,7 +919,7 @@ class Portainer:
# return stacks
def create_secret(self, name, value, endpoint_id=None, timeout=None):
'''Create a Docker secret on the specified endpoint.'''
"""Create a Docker secret on the specified endpoint."""
endpoint_id = int(self.endpoints["by_name"][endpoint_id])
path = f"/endpoints/{endpoint_id}/docker/secrets/create"
encoded = base64.b64encode(value.encode()).decode()

View File

@@ -16,7 +16,8 @@ import tty
import termios
from tabulate import tabulate
from port import Portainer
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
VERSION = "0.0.5"
@@ -176,16 +177,17 @@ logging.info("script started")
logger = logging.getLogger(__name__)
def wl(msg):
"""Write log message if debug is enabled."""
if args.debug:
print(msg)
def prompt_missing_args(args_in, defaults_in, fields):
"""
fields = [("arg_name", "Prompt text")]
"""
def input_with_default(prompt, default, longest):
full_prompt = f" >> {prompt:{longest}}"
sys.stdout.write(full_prompt)
@@ -221,6 +223,7 @@ def prompt_missing_args(args_in, defaults_in, fields):
sys.stdout.flush()
return user_input
longest = 0
for field, text in fields:
a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")"
@@ -233,24 +236,96 @@ def prompt_missing_args(args_in, defaults_in, fields):
cur_site = defaults_in.get("PORTAINER_SITE".upper())
if value_in is None:
if default is not None:
prompt = f"{text} (default={default}) : "
prompt_text = f"{text} (default={default}) : "
# value_in = input(prompt) or default
value_in = input_with_default(prompt, default, longest+2)
defaults_in[f"PORTAINER_{field}".upper()] = value_in
if field == "site":
commands = ["portainer", "port"]
elif field == "deploy_mode":
commands = ["git", "upload"]
elif field == "stack_mode":
commands = ["swarm", "compose"]
elif field == "endpoint_id":
commands = por.endpoints_names
elif field == "stack":
if args.action == "create_stack":
commands = []
else:
commands = []
if por._debug:
input(por.stacks_all)
# print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()])
try:
for s in por.stacks_all[
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]
]["by_name"].keys():
commands.append(s)
except KeyError:
print(
"No stacks found for endpoint",
defaults_in[f"PORTAINER_ENDPOINT_ID".upper()],
)
sys.exit(1)
else:
commands = []
completer = WordCompleter(
commands, ignore_case=True, match_middle=False
)
try:
value_in = (
prompt(
f" >> {prompt_text}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(prompt_text, default, longest+2)
else:
# value_in = input(f"{text}: ")
value_in = input_with_default(text, default, longest+2)
commands = ["start", "stop", "status", "restart", "reload", "exit"]
completer = WordCompleter(commands, ignore_case=True)
try:
value_in = (
prompt(
f" >> {text} {default}",
completer=completer,
placeholder=default,
)
or default
)
except KeyboardInterrupt:
print("\n^C received — exiting cleanly.")
sys.exit(0)
# value_in = input_with_default(text, default, longest+2)
if por._debug:
print("Value entered:", value_in)
defaults_in[f"PORTAINER_{field}".upper()] = value_in
setattr(args, field, value_in)
os.environ[field] = value_in
if field == "site" and value_in != cur_site:
por.get_site(value_in)
if por._debug:
print(f"{defaults_in} {field} {value_in}")
if field == "endpoint_id" and value_in != defaults_in.get(
"PORTAINER_ENDPOINT_ID".upper()
):
print("refreshing environment")
por.get_endpoints()
with open("/myapps/portainer.conf", "w") as f:
for k in defaults_in.keys():
f.write(f"{k}={defaults_in[k]}\n")
return args
print(cur_config)
if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
@@ -289,7 +364,7 @@ if __name__ == "__main__":
args.action = actions[int(ans) - 1]
os.system("cls" if os.name == "nt" else "clear")
# Example: list endpoints
por = Portainer(defaults['site'], timeout=args.timeout)
por = Portainer(defaults["site"], timeout=args.timeout)
if args.debug:
por._debug = True
if args.action == "secrets":
@@ -318,6 +393,10 @@ if __name__ == "__main__":
("stack", "Stack name or ID"),
],
)
input(
f"Delete stack {args.stack} on endpoint {args.endpoint_id}. Press ENTER to continue..."
)
por.delete_stack(
args.endpoint_id,
args.stack,