From ee01577a6cee55f886b2a0f81b945ad1f3819b13 Mon Sep 17 00:00:00 2001 From: jaydee Date: Thu, 4 Dec 2025 22:02:58 +0100 Subject: [PATCH] build --- port.py | 71 ++++++++++++++++++++-------------- portainer.py | 107 ++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 135 insertions(+), 43 deletions(-) diff --git a/port.py b/port.py index 0c9ae7d..80130a8 100644 --- a/port.py +++ b/port.py @@ -1,3 +1,5 @@ +'''Portainer API wrapper module.''' + import os from concurrent.futures import ThreadPoolExecutor import json @@ -12,7 +14,6 @@ from git import Repo import requests - logger = logging.getLogger(__name__) @@ -108,18 +109,23 @@ class Portainer: self.get_endpoints() self.get_stacks() self.get_containers() - - + def get_site(self, site): if site == "portainer": - self.base_url = os.getenv("PORTAINER_URL", "https://portainer.sectorq.eu/api") + self.base_url = os.getenv( + "PORTAINER_URL", "https://portainer.sectorq.eu/api" + ) self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" elif site == "port": self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api") self.token = "ptr_/5RkMCT/j3BTaL32vMSDtXFi76yOXRKVFOrUtzMsl5Y=" else: - self.base_url = os.getenv("PORTAINER_URL", "https://portainer.sectorq.eu/api") + self.base_url = os.getenv( + "PORTAINER_URL", "https://portainer.sectorq.eu/api" + ) self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" + self.get_endpoints() + self.get_stacks() def _is_number(self, s): """Check if the input string is a number.""" @@ -176,12 +182,14 @@ class Portainer: return resp.status_code def refresh(self): + '''Refresh all data from Portainer.''' self.get_endpoints() self.get_stacks(self) self.get_containers(self) return True def get_stacks(self, endpoint_id="all", timeout=10): + '''Get a list of stacks for a specific endpoint or all endpoints.''' if endpoint_id != "all": endpoint_id = self.get_endpoint_id(endpoint_id) path = "/stacks" @@ -248,12 +256,14 @@ class Portainer: return stcks def update_status(self, endpoint, stack): + '''Get the update status of a specific stack on an endpoint.''' path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true" # input(path) stats = self._api_get(path) print(stats) def get_endpoint_id(self, endpoint): + '''Get endpoint ID from either ID or name input.''' if self._is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] @@ -264,6 +274,7 @@ class Portainer: return self.endpoints["by_name"][endpoint] def get_endpoint_name(self, endpoint): + '''Get endpoint name from either ID or name input.''' if self._is_number(endpoint): self.endpoint_id = endpoint self.endpoint_name = self.endpoints["by_id"][endpoint] @@ -274,6 +285,7 @@ class Portainer: return endpoint def get_containers(self, endpoint="all", stack="all", timeout=30): + '''Get a list of containers for a specific endpoint and stack.''' # print(json.dumps(self.all_data,indent=2)) # print(endpoint) # print(stack) @@ -325,6 +337,7 @@ class Portainer: return cont def stop_containers(self, endpoint, containers, timeout=130): + '''Stop containers on an endpoint.''' if self.all_data["endpoints_status"][endpoint] != 1: print(f"Endpoint {self.get_endpoint_name(endpoint)} is offline") ep_id = self.endpoints["by_name"][endpoint] @@ -342,6 +355,7 @@ class Portainer: # return 0 def start_containers(self, endpoint, containers, timeout=130): + '''Start containers on an endpoint.''' ep_id = self.endpoints["by_name"][endpoint] def stop(c): @@ -352,6 +366,7 @@ class Portainer: exe.map(stop, containers) def update_stack(self, endpoint, stack, autostart, timeout=130): + '''Update one stack or all stacks on an endpoint.''' stcs = [] if stack == "all": for s in self.all_data["webhooks"][endpoint]: @@ -386,6 +401,7 @@ class Portainer: self.stop_containers(endpoint, cont) def get_endpoints(self, timeout=10): + '''Get a list of all endpoints.''' endpoints = self._api_get("/endpoints") eps = {"by_id": {}, "by_name": {}} eps_stats = {} @@ -395,6 +411,7 @@ class Portainer: eps_stats[ep["Id"]] = ep["Status"] eps_stats[ep["Name"]] = ep["Status"] self.endpoints = eps + self.endpoints_names = list(eps["by_name"]) self.all_data["endpoints"] = eps self.all_data["endpoints_status"] = eps_stats # input(eps_stats) @@ -402,6 +419,7 @@ class Portainer: return eps def get_endpoint(self, endpoint_id=None, timeout=30): + '''Get endpoint ID and name from either ID or name input.''' self.get_endpoints() # print(self.endpoints) if self._is_number(endpoint_id): @@ -413,6 +431,7 @@ class Portainer: return self.endpoint_id def get_swarm_id(self, endpoint): + '''Get the swarm ID for a specific endpoint.''' ep_id = self.endpoints["by_name"][endpoint] path = f"/endpoints/{ep_id}/docker/info" stats = self._api_get(path) @@ -459,9 +478,9 @@ class Portainer: stack=None, mode="git", autostart=False, - stack_mode='swarm', + stack_mode="swarm", ): - if stack_mode == 'swarm': + if stack_mode == "swarm": swarm_id = self.get_swarm_id(endpoint) p = "swarm" env_path = f"{self.repo_dir}/__swarm/{stack}/.env" @@ -505,7 +524,7 @@ class Portainer: if stack_check: print(f"Stack {stack} already exist") continue - print(f"Working on {stack}") + print(f"Working on {stack} , stack mode: {stack_mode}") envs = [] if os.path.exists(f"{env_path}"): @@ -571,13 +590,13 @@ class Portainer: "method": "repository", "swarmID": None, } - if stack_mode == 'swarm': + if stack_mode == "swarm": req["type"] = "swarm" req["swarmID"] = swarm_id req["composeFile"] = f"__swarm/{stack}/{stack}-swarm.yml" req["ConfigFilePath"] = f"__swarm/{stack}/{stack}-swarm.yml" - - print(json.dumps(req)) + if self._debug: + print(json.dumps(req)) res = self._api_post(path, req) if "Id" in res: # print("Deploy request OK") @@ -675,7 +694,7 @@ class Portainer: self._api_post_file(path, self.endpoint_id, stack, envs, file) def print_stacks(self, endpoint="all"): - '''Print a table of stacks, optionally filtered by endpoint.''' + """Print a table of stacks, optionally filtered by endpoint.""" stacks = self.get_stacks() count = 0 data = [] @@ -697,7 +716,7 @@ class Portainer: except KeyError as e: data.append([stack["Id"], stack["Name"], "?"]) logger.debug( - "KeyError getting endpoint name for stack %s : %s", stack['Name'],e + "KeyError getting endpoint name for stack %s : %s", stack["Name"], e ) count += 1 @@ -706,7 +725,7 @@ class Portainer: print(f"Total stacks: {count}") def start_stack(self, stack=None, endpoint_id=None): - '''Start one stack or all stacks on an endpoint.''' + """Start one stack or all stacks on an endpoint.""" if endpoint_id is not None: print("Getting endpoint") self.get_endpoint(endpoint_id) @@ -732,7 +751,7 @@ class Portainer: return True def stop_stack(self, stack, endpoint_id): - '''Stop one stack or all stacks on an endpoint.''' + """Stop one stack or all stacks on an endpoint.""" print(f"Stopping stack {stack}") if endpoint_id is not None: self.get_endpoint(endpoint_id) @@ -761,10 +780,9 @@ class Portainer: f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stck]} : {json.loads(resp)['message']}" ) return True - - + def _resolve_endpoint(self, endpoint_id): - + self.get_endpoints() if self._debug: print(endpoint_id) @@ -775,7 +793,7 @@ class Portainer: else: self.endpoint_name = endpoint_id self.endpoint_id = int(self.endpoints["by_name"][endpoint_id]) - + def _resolve_stack_id(self, stack, endpoint_id): if stack == "all": return "all" @@ -785,6 +803,7 @@ class Portainer: return result["Id"] return int(stack) + def _delete_all_stacks(self, endpoint_id): stacks = self.get_stacks(endpoint_id) paths = [] @@ -794,11 +813,7 @@ class Portainer: continue path = f"/stacks/{s['Id']}?endpointId={endpoint_id}&removeVolumes=true" - paths.append([ - self.get_endpoint_name(endpoint_id), - s["Name"], - path - ]) + paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) def delete_item(item): print(f"Delete stack {item[1]} from {item[0]}") @@ -809,7 +824,7 @@ class Portainer: exe.map(delete_item, paths) return "Done" - + def _delete_single_stack(self, stack_id, endpoint_id): path = f"/stacks/{stack_id}?endpointId={endpoint_id}&removeVolumes=true" @@ -836,9 +851,7 @@ class Portainer: return self._delete_all_stacks(endpoint_id) return self._delete_single_stack(stack_id, endpoint_id) - - - + # def delete_stack(self, endpoint_id=None, stack=None): # """ # Return a list of stacks. If endpoint_id is provided, it will be added as a query param. @@ -906,7 +919,7 @@ class Portainer: # return stacks def create_secret(self, name, value, endpoint_id=None, timeout=None): - '''Create a Docker secret on the specified endpoint.''' + """Create a Docker secret on the specified endpoint.""" endpoint_id = int(self.endpoints["by_name"][endpoint_id]) path = f"/endpoints/{endpoint_id}/docker/secrets/create" encoded = base64.b64encode(value.encode()).decode() diff --git a/portainer.py b/portainer.py index 8fab035..6793887 100755 --- a/portainer.py +++ b/portainer.py @@ -16,7 +16,8 @@ import tty import termios from tabulate import tabulate from port import Portainer - +from prompt_toolkit import prompt +from prompt_toolkit.completion import WordCompleter VERSION = "0.0.5" @@ -42,7 +43,7 @@ else: print("No /myapps/portainer.conf file found, proceeding with env vars.") os.makedirs("/myapps", exist_ok=True) - + for field in defaults.keys(): value_in = os.getenv(f"PORTAINER_{field.upper()}") if value_in is not None: @@ -176,16 +177,17 @@ logging.info("script started") logger = logging.getLogger(__name__) - def wl(msg): """Write log message if debug is enabled.""" if args.debug: print(msg) + def prompt_missing_args(args_in, defaults_in, fields): """ fields = [("arg_name", "Prompt text")] """ + def input_with_default(prompt, default, longest): full_prompt = f" >> {prompt:{longest}}" sys.stdout.write(full_prompt) @@ -219,11 +221,12 @@ def prompt_missing_args(args_in, defaults_in, fields): # rewrite final line cleanly sys.stdout.write(f"\r{full_prompt}{user_input:10} {checkmark}\n") sys.stdout.flush() - + return user_input + longest = 0 for field, text in fields: - a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")" + a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")" if len(a) > longest: longest = len(a) @@ -233,24 +236,96 @@ def prompt_missing_args(args_in, defaults_in, fields): cur_site = defaults_in.get("PORTAINER_SITE".upper()) if value_in is None: if default is not None: - prompt = f"{text} (default={default}) : " + prompt_text = f"{text} (default={default}) : " # value_in = input(prompt) or default - value_in = input_with_default(prompt, default, longest+2) - defaults_in[f"PORTAINER_{field}".upper()] = value_in + + if field == "site": + commands = ["portainer", "port"] + elif field == "deploy_mode": + commands = ["git", "upload"] + elif field == "stack_mode": + commands = ["swarm", "compose"] + elif field == "endpoint_id": + commands = por.endpoints_names + elif field == "stack": + if args.action == "create_stack": + commands = [] + else: + commands = [] + if por._debug: + input(por.stacks_all) + # print(defaults_in[f"PORTAINER_ENDPOINT_ID".upper()]) + try: + for s in por.stacks_all[ + defaults_in[f"PORTAINER_ENDPOINT_ID".upper()] + ]["by_name"].keys(): + commands.append(s) + except KeyError: + print( + "No stacks found for endpoint", + defaults_in[f"PORTAINER_ENDPOINT_ID".upper()], + ) + sys.exit(1) + else: + commands = [] + completer = WordCompleter( + commands, ignore_case=True, match_middle=False + ) + try: + value_in = ( + prompt( + f" >> {prompt_text}", + completer=completer, + placeholder=default, + ) + or default + ) + except KeyboardInterrupt: + print("\n^C received — exiting cleanly.") + sys.exit(0) + + # value_in = input_with_default(prompt_text, default, longest+2) + else: - #value_in = input(f"{text}: ") - value_in = input_with_default(text, default, longest+2) - defaults_in[f"PORTAINER_{field}".upper()] = value_in + # value_in = input(f"{text}: ") + commands = ["start", "stop", "status", "restart", "reload", "exit"] + completer = WordCompleter(commands, ignore_case=True) + try: + value_in = ( + prompt( + f" >> {text} {default}", + completer=completer, + placeholder=default, + ) + or default + ) + except KeyboardInterrupt: + print("\n^C received — exiting cleanly.") + sys.exit(0) + + # value_in = input_with_default(text, default, longest+2) + + if por._debug: + print("Value entered:", value_in) + defaults_in[f"PORTAINER_{field}".upper()] = value_in setattr(args, field, value_in) os.environ[field] = value_in if field == "site" and value_in != cur_site: por.get_site(value_in) + if por._debug: + print(f"{defaults_in} {field} {value_in}") + if field == "endpoint_id" and value_in != defaults_in.get( + "PORTAINER_ENDPOINT_ID".upper() + ): + print("refreshing environment") + por.get_endpoints() with open("/myapps/portainer.conf", "w") as f: for k in defaults_in.keys(): f.write(f"{k}={defaults_in[k]}\n") - + return args + print(cur_config) if __name__ == "__main__": # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. @@ -289,7 +364,7 @@ if __name__ == "__main__": args.action = actions[int(ans) - 1] os.system("cls" if os.name == "nt" else "clear") # Example: list endpoints - por = Portainer(defaults['site'], timeout=args.timeout) + por = Portainer(defaults["site"], timeout=args.timeout) if args.debug: por._debug = True if args.action == "secrets": @@ -318,6 +393,10 @@ if __name__ == "__main__": ("stack", "Stack name or ID"), ], ) + + input( + f"Delete stack {args.stack} on endpoint {args.endpoint_id}. Press ENTER to continue..." + ) por.delete_stack( args.endpoint_id, args.stack, @@ -355,7 +434,7 @@ if __name__ == "__main__": ("stack", "Stack name or ID"), ], ) - + por.stop_stack(args.stack, args.endpoint_id) sys.exit()