diff --git a/port.py b/port.py index 195763e..3cf37a5 100644 --- a/port.py +++ b/port.py @@ -110,6 +110,10 @@ class Portainer: self.get_stacks() self.get_containers() + def set_defaults(self, config): + '''Set default configuration from provided config dictionary.''' + self.cur_config = config + def get_site(self, site): if site == "portainer": self.base_url = os.getenv( @@ -698,6 +702,7 @@ class Portainer: stacks = self.get_stacks() count = 0 data = [] + stack_names = [] for stack in stacks: if endpoint is not None: if not stack["EndpointId"] in self.endpoints["by_id"]: @@ -706,6 +711,7 @@ class Portainer: if self.endpoints["by_name"][endpoint] != stack["EndpointId"]: continue try: + stack_names.append(stack["Name"]) data.append( [ stack["Id"], @@ -723,6 +729,7 @@ class Portainer: headers = ["StackID", "Name", "Endpoint"] print(tabulate.tabulate(data, headers=headers, tablefmt="github")) print(f"Total stacks: {count}") + # print(sorted(stack_names)) def start_stack(self, stack=None, endpoint_id=None): """Start one stack or all stacks on an endpoint.""" diff --git a/portainer.py b/portainer.py index 09cb054..62b35ab 100755 --- a/portainer.py +++ b/portainer.py @@ -30,51 +30,62 @@ defaults = { "site": "portainer", } cur_config = {} -if os.path.exists("/myapps/portainer.conf"): - with open("/myapps/portainer.conf", "r") as f: - conf_data = f.read() - for line in conf_data.split("\n"): - if line.startswith("#") or line.strip() == "": - continue - key, value = line.split("=", 1) - os.environ[key.strip()] = value.strip() - cur_config[key.strip()] = value.strip() -else: +def load_config(defaults=defaults): + '''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.''' + if os.path.exists("/myapps/portainer.conf"): + with open("/myapps/portainer.conf", "r") as f: + conf_data = f.read() + for line in conf_data.split("\n"): + if line.startswith("#") or line.strip() == "": + continue + key, value = line.split("=", 1) + os.environ[key.strip()] = value.strip() + cur_config[key.strip()] = value.strip() + else: + print("No /myapps/portainer.conf file found, proceeding with env vars.") + os.makedirs("/myapps", exist_ok=True) - print("No /myapps/portainer.conf file found, proceeding with env vars.") - os.makedirs("/myapps", exist_ok=True) + for field in defaults.keys(): + value_in = os.getenv(f"PORTAINER_{field.upper()}") + if value_in is not None: + os.environ[f"PORTAINER_{field.upper()}"] = value_in + cur_config[f"PORTAINER_{field.upper()}"] = value_in + else: + os.environ[f"PORTAINER_{field.upper()}"] = defaults[field] + cur_config[f"PORTAINER_{field.upper()}"] = defaults[field] - for field in defaults.keys(): - value_in = os.getenv(f"PORTAINER_{field.upper()}") - if value_in is not None: - os.environ[field] = value_in - cur_config[field] = value_in - else: - os.environ[field] = defaults[field] - cur_config[field] = defaults[field] - - conf_data = "\n".join(f"PORTAINER_{k.upper()}={v}" for k, v in cur_config.items()) + conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) # print("Using the following configuration:") - # print(conf_data) - with open("/myapps/portainer.conf", "w") as f: f.write(conf_data) print("Configuration written to /myapps/portainer.conf") + return cur_config -if os.getenv("PORTAINER_SITE") is not None: - defaults["site"] = os.getenv("PORTAINER_SITE") -if os.getenv("PORTAINER_ENDPOINT_ID") is not None: - defaults["endpoint_id"] = os.getenv("PORTAINER_ENDPOINT_ID") -if os.getenv("PORTAINER_STACK") is not None: - defaults["stack"] = os.getenv("PORTAINER_STACK") -if os.getenv("PORTAINER_DEPLOY_MODE") is not None: - defaults["deploy_mode"] = os.getenv("PORTAINER_DEPLOY_MODE") -if os.getenv("PORTAINER_STACK_MODE") is not None: - defaults["stack_mode"] = os.getenv("PORTAINER_STACK_MODE") -print(cur_config) + +cur_config = load_config(defaults) + +# ENV_VARS = [ +# "PORTAINER_URL", +# "PORTAINER_SITE", +# "PORTAINER_ENDPOINT_ID", +# "PORTAINER_STACK", +# "PORTAINER_DEPLOY_MODE", +# "PORTAINER_STACK_MODE", +# ] + + +def update_configs(cur_config): + '''Update defaults from environment variables if set.''' + conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items()) + # print("Using the following configuration:") + # print(conf_data) + with open("/myapps/portainer.conf", "w") as f: + f.write(conf_data) + print("Configuration written to /myapps/portainer.conf") + parser = argparse.ArgumentParser( description="Portainer helper - use env vars or pass credentials." ) @@ -139,6 +150,23 @@ args = parser.parse_args() print("Running version:", VERSION) print("Environment:", args.site) + +if args.site is not None: + cur_config["PORTAINER_SITE"] = args.site +if args.endpoint_id is not None: + cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id +if args.stack is not None: + cur_config["PORTAINER_STACK"] = args.stack +if args.deploy_mode is not None: + cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode +if args.stack_mode is not None: + cur_config["PORTAINER_STACK_MODE"] = args.stack_mode + +update_configs(cur_config) + +if args.debug: + input(cur_config) + _LOG_LEVEL = "INFO" LOG_FILE = "/tmp/portainer.log" if _LOG_LEVEL == "DEBUG": @@ -188,42 +216,6 @@ def prompt_missing_args(args_in, defaults_in, fields): fields = [("arg_name", "Prompt text")] """ - def input_with_default(prompt, default, longest): - full_prompt = f" >> {prompt:{longest}}" - sys.stdout.write(full_prompt) - sys.stdout.flush() - checkmark = "\033[92m\u2714\033[0m" - # read raw input char-by-char - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(fd) - user_input = "" - while True: - ch = sys.stdin.read(1) - if ch in ("\n", "\r"): # ENTER - break - elif ch == "\x7f": # BACKSPACE - if user_input: - user_input = user_input[:-1] - sys.stdout.write("\b \b") - sys.stdout.flush() - else: - user_input += ch - sys.stdout.write(ch) - sys.stdout.flush() - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - - if not user_input: - user_input = default - - # rewrite final line cleanly - sys.stdout.write(f"\r{full_prompt}{user_input:10} {checkmark}\n") - sys.stdout.flush() - - return user_input - longest = 0 for field, text in fields: a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")" @@ -238,7 +230,6 @@ def prompt_missing_args(args_in, defaults_in, fields): if default is not None: prompt_text = f"{text} (default={default}) : " # value_in = input(prompt) or default - if field == "site": commands = ["portainer", "port"] elif field == "deploy_mode": @@ -249,7 +240,13 @@ def prompt_missing_args(args_in, defaults_in, fields): commands = por.endpoints_names elif field == "stack": if args.action == "create_stack": - commands = [] + commands = [ + 'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana', + 'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3', + 'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nextcloud', 'nginx', + 'node-red', 'octoprint', 'ollama', 'pihole', 'portainer-ce', 'rancher', 'registry', + 'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', + 'wud', 'zabbix-server'] else: commands = [] if por._debug: @@ -326,7 +323,6 @@ def prompt_missing_args(args_in, defaults_in, fields): return args -print(cur_config) if __name__ == "__main__": # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. # token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret") @@ -364,7 +360,8 @@ if __name__ == "__main__": args.action = actions[int(ans) - 1] os.system("cls" if os.name == "nt" else "clear") # Example: list endpoints - por = Portainer(defaults["site"], timeout=args.timeout) + por = Portainer(cur_config["PORTAINER_SITE"], timeout=args.timeout) + por.set_defaults(cur_config) if args.debug: por._debug = True if args.action == "secrets": @@ -452,8 +449,16 @@ if __name__ == "__main__": sys.exit() if args.action == "list_stacks": + args = prompt_missing_args( + args, + cur_config, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ], + ) por.print_stacks(args.endpoint_id) - print(json.dumps(por.all_data, indent=2)) + # print(json.dumps(por.all_data, indent=2)) sys.exit() if args.action == "list_containers":