From 04248ce279f01285986e6dcb05cbc435a9f64cd1 Mon Sep 17 00:00:00 2001 From: jaydee Date: Fri, 12 Dec 2025 10:39:06 +0100 Subject: [PATCH] build --- port.py | 117 ++++++++++++++++++++++++++++----------------------- portainer.py | 46 +++++--------------- 2 files changed, 74 insertions(+), 89 deletions(-) diff --git a/port.py b/port.py index 60a1992..8578671 100644 --- a/port.py +++ b/port.py @@ -27,9 +27,10 @@ class Portainer: to perform API operations. """ - def __init__(self, site, timeout=10): + def __init__(self, site, args=None, timeout=120): self.base_url = None self.token = None + self.args = args self.action = None self._debug = False self.timeout = timeout @@ -39,7 +40,7 @@ class Portainer: self.stack_id = None self.stack_ids = [] self.endpoint_name = None - self.endpoint_id = None + self.endpoint_id = args.endpoint_id # self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git" self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" @@ -275,10 +276,10 @@ class Portainer: def get_services(self, endpoint, timeout=30): '''Get a list of services for a specific stack on an endpoint.''' - print(json.dumps(self.all_data,indent=2)) - path = f"/endpoints/{endpoint}/docker/services" - print(path) - #path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}' + # print(json.dumps(self.all_data,indent=2)) + path = f"/endpoints/{self.get_endpoint_id(endpoint)}/docker/services" + # print(path) + # path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}' services = self._api_get(path, timeout=timeout) return services @@ -399,7 +400,11 @@ class Portainer: for s in self.all_data["webhooks"][endpoint]: stcs.append([s, self.all_data["webhooks"][endpoint][s]]) else: - stcs.append([stack, self.all_data["webhooks"][endpoint][stack]]) + try: + stcs.append([stack, self.all_data["webhooks"][endpoint][stack]]) + except Exception as e: + print(f"Error: Stack {stack} not found on endpoint {endpoint}: {e}") + # input(stcs) def update(c): @@ -724,7 +729,6 @@ class Portainer: } self._api_post_file(path, self.endpoint_id, stack, envs, file) - def print_stacks(self, endpoint="all"): """Print a table of stacks, optionally filtered by endpoint.""" stacks = self.get_stacks() @@ -763,53 +767,60 @@ class Portainer: all_services = self.get_services(self.get_endpoint_id(endpoint_id)) service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services] - - input(service_tuples) - if service_id == "all": + service_tuples = sorted(service_tuples, key=lambda x: x[1]) + service_dict = dict(service_tuples) + # input(service_tuples) + if service_id is None: + #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] + service_tuples.insert(0, ("__ALL__", "[Select ALL]")) + service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]")) + service_ids = checkboxlist_dialog( + title="Select one service", + text="Choose a service:", + values=service_tuples + ).run() + elif service_id == "all": + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] + else: + service_ids = [service_id] + if "__ONLY_CHECK__" in service_ids: + pull = False + else: + pull = False + if "__ALL__" in service_ids: + service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"] + + longest = 0 + for a in service_dict.items(): + # print(a[1]) + if len(a[1]) > longest: + longest = len(a[1]) + #print(longest) + ok = "\033[92m✔\033[0m" + err = "\033[91m✖\033[0m" + for service_id in service_ids: + print("\033[?25l", end="") + print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True) + path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true" - service_id = self.all - #services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] - services.insert(0, ("__ALL__", "[Select ALL]")) - service_ids = checkboxlist_dialog( - title="Select one service", - text="Choose a service:", - values=services - ).run() - if service_ids == "__ALL__": - pass - print(service_ids) - service_dict = dict(service_ids) - services = self.get_services(self.endpoint_name, stack_id) - svc_name = service_dict.get(stack_id) - stack_svcs = [] - svc_menu = [] - for s in services: try: - if svc_name in s['Spec']['Name']: - stack_svcs.append([s['Version']['Index'], s['Spec']['Name']]) - svc_menu.append([s['ID'], s['Spec']['Name']]) - except KeyError as e: - print(e) - - - service_id = radiolist_dialog( - title="Select one service", - text="Choose a service:", - values=svc_menu - ).run() - - - """Restart a service on an endpoint.""" - path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true" - - try: - resp = self._api_get(path, timeout=20) - except ValueError as e: - print(f"Error restarting service: {e}") - return [] - if resp['Status'] == "outdated": - self.restart_srv(service_id, True) - print(f"Service {service_id} : restarted") + resp = self._api_get(path, timeout=20) + except ValueError as e: + print(f"Error restarting service: {e}") + return [] + + if resp['Status'] == "outdated": + if pull: + self.restart_srv(service_id, pull) + print(f"Service {service_dict[service_id]:<{longest}} : updated") + else: + print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True) + #print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m") + pass + print(err) + else: + print(ok) + print("\033[?25h", end="") return True def restart_srv(self,service_id, pool=False): diff --git a/portainer.py b/portainer.py index f8f906a..f56a0d7 100755 --- a/portainer.py +++ b/portainer.py @@ -71,7 +71,7 @@ def load_config(defaults=defaults): -cur_config = load_config(defaults) +a = load_config(defaults) # ENV_VARS = [ # "PORTAINER_URL", @@ -116,46 +116,13 @@ parser.add_argument( default=None, help="Service ID to limit service operations", ) -parser.add_argument( - "--refresh-environment", "-R", action="store_true", help="List endpoints" -) -parser.add_argument( - "--list-endpoints", "-E", action="store_true", help="List endpoints" -) -parser.add_argument("--list-stacks", "-l", action="store_true", help="List stacks") -parser.add_argument("--print-all-data", "-A", action="store_true", help="List stacks") -parser.add_argument( - "--list-containers", "-c", action="store_true", help="List containers" -) -parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacks") -parser.add_argument( - "--stop-containers", "-O", action="store_true", help="Stop containers" -) -parser.add_argument( - "--start-containers", "-X", action="store_true", help="Start containers" -) -parser.add_argument("--update-status", "-S", action="store_true", help="Update status") -parser.add_argument( - "--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id" -) +parser.add_argument("--stack", "-s", type=str, nargs="+", help="Stack ID for operations") parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform") parser.add_argument( "--autostart", "-Z", action="store_true", help="Auto-start created stacks" ) -parser.add_argument("--start-stack", "-x", action="store_true") -parser.add_argument("--stop-stack", "-o", action="store_true") -parser.add_argument("--secrets", "-q", action="store_true") parser.add_argument("--debug", "-D", action="store_true") -parser.add_argument("--create-stack", "-n", action="store_true") -parser.add_argument("--create-stack_new2", "-N", action="store_true") parser.add_argument("--gpu", "-g", action="store_true") -parser.add_argument("--create-stacks", "-C", action="store_true") -parser.add_argument("--refresh-status", "-r", action="store_true") - -parser.add_argument("--stack", "-s", type=str, nargs="+", help="Stack ID for operations") -parser.add_argument( - "--token-only", action="store_true", help="Print auth token and exit" -) parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds") parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode") parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode") @@ -235,10 +202,13 @@ def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None): longest = len(a) for field, text in fields: + # print(field) value_in = getattr(args_in, field) default = defaults_in.get(f"PORTAINER_{field}".upper()) cur_site = defaults_in.get("PORTAINER_SITE".upper()) cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper()) + + # print(value_in) if value_in is None: if default is not None: prompt_text = f"{text} (default={default}) : " @@ -390,6 +360,7 @@ if __name__ == "__main__": def signal_handler(sig, frame): logger.warning("Killed manually %s, %s", sig, frame) print("\nTerminated by user") + print("\033[?25h", end="") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) os.system("cls" if os.name == "nt" else "clear") @@ -438,7 +409,7 @@ if __name__ == "__main__": os.system("cls" if os.name == "nt" else "clear") # Example: list endpoints - por = Portainer(cur_config["PORTAINER_SITE"], timeout=args.timeout) + por = Portainer(cur_config["PORTAINER_SITE"], args) por.set_defaults(cur_config) if args.debug: por._debug = True @@ -548,6 +519,9 @@ if __name__ == "__main__": sys.exit() if args.action == "update_service": + + + args = prompt_missing_args( args, cur_config,