This commit is contained in:
2025-12-04 23:13:54 +01:00
parent f328118f65
commit 493f16281c
2 changed files with 88 additions and 76 deletions

View File

@@ -110,6 +110,10 @@ class Portainer:
self.get_stacks()
self.get_containers()
def set_defaults(self, config):
'''Set default configuration from provided config dictionary.'''
self.cur_config = config
def get_site(self, site):
if site == "portainer":
self.base_url = os.getenv(
@@ -698,6 +702,7 @@ class Portainer:
stacks = self.get_stacks()
count = 0
data = []
stack_names = []
for stack in stacks:
if endpoint is not None:
if not stack["EndpointId"] in self.endpoints["by_id"]:
@@ -706,6 +711,7 @@ class Portainer:
if self.endpoints["by_name"][endpoint] != stack["EndpointId"]:
continue
try:
stack_names.append(stack["Name"])
data.append(
[
stack["Id"],
@@ -723,6 +729,7 @@ class Portainer:
headers = ["StackID", "Name", "Endpoint"]
print(tabulate.tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}")
# print(sorted(stack_names))
def start_stack(self, stack=None, endpoint_id=None):
"""Start one stack or all stacks on an endpoint."""

View File

@@ -30,7 +30,9 @@ defaults = {
"site": "portainer",
}
cur_config = {}
if os.path.exists("/myapps/portainer.conf"):
def load_config(defaults=defaults):
'''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.'''
if os.path.exists("/myapps/portainer.conf"):
with open("/myapps/portainer.conf", "r") as f:
conf_data = f.read()
for line in conf_data.split("\n"):
@@ -39,42 +41,51 @@ if os.path.exists("/myapps/portainer.conf"):
key, value = line.split("=", 1)
os.environ[key.strip()] = value.strip()
cur_config[key.strip()] = value.strip()
else:
else:
print("No /myapps/portainer.conf file found, proceeding with env vars.")
os.makedirs("/myapps", exist_ok=True)
for field in defaults.keys():
value_in = os.getenv(f"PORTAINER_{field.upper()}")
if value_in is not None:
os.environ[field] = value_in
cur_config[field] = value_in
os.environ[f"PORTAINER_{field.upper()}"] = value_in
cur_config[f"PORTAINER_{field.upper()}"] = value_in
else:
os.environ[field] = defaults[field]
cur_config[field] = defaults[field]
os.environ[f"PORTAINER_{field.upper()}"] = defaults[field]
cur_config[f"PORTAINER_{field.upper()}"] = defaults[field]
conf_data = "\n".join(f"PORTAINER_{k.upper()}={v}" for k, v in cur_config.items())
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
# print(conf_data)
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
return cur_config
if os.getenv("PORTAINER_SITE") is not None:
defaults["site"] = os.getenv("PORTAINER_SITE")
if os.getenv("PORTAINER_ENDPOINT_ID") is not None:
defaults["endpoint_id"] = os.getenv("PORTAINER_ENDPOINT_ID")
if os.getenv("PORTAINER_STACK") is not None:
defaults["stack"] = os.getenv("PORTAINER_STACK")
if os.getenv("PORTAINER_DEPLOY_MODE") is not None:
defaults["deploy_mode"] = os.getenv("PORTAINER_DEPLOY_MODE")
if os.getenv("PORTAINER_STACK_MODE") is not None:
defaults["stack_mode"] = os.getenv("PORTAINER_STACK_MODE")
print(cur_config)
cur_config = load_config(defaults)
# ENV_VARS = [
# "PORTAINER_URL",
# "PORTAINER_SITE",
# "PORTAINER_ENDPOINT_ID",
# "PORTAINER_STACK",
# "PORTAINER_DEPLOY_MODE",
# "PORTAINER_STACK_MODE",
# ]
def update_configs(cur_config):
'''Update defaults from environment variables if set.'''
conf_data = "\n".join(f"{k.upper()}={v}" for k, v in cur_config.items())
# print("Using the following configuration:")
# print(conf_data)
with open("/myapps/portainer.conf", "w") as f:
f.write(conf_data)
print("Configuration written to /myapps/portainer.conf")
parser = argparse.ArgumentParser(
description="Portainer helper - use env vars or pass credentials."
)
@@ -139,6 +150,23 @@ args = parser.parse_args()
print("Running version:", VERSION)
print("Environment:", args.site)
if args.site is not None:
cur_config["PORTAINER_SITE"] = args.site
if args.endpoint_id is not None:
cur_config["PORTAINER_ENDPOINT_ID"] = args.endpoint_id
if args.stack is not None:
cur_config["PORTAINER_STACK"] = args.stack
if args.deploy_mode is not None:
cur_config["PORTAINER_DEPLOY_MODE"] = args.deploy_mode
if args.stack_mode is not None:
cur_config["PORTAINER_STACK_MODE"] = args.stack_mode
update_configs(cur_config)
if args.debug:
input(cur_config)
_LOG_LEVEL = "INFO"
LOG_FILE = "/tmp/portainer.log"
if _LOG_LEVEL == "DEBUG":
@@ -188,42 +216,6 @@ def prompt_missing_args(args_in, defaults_in, fields):
fields = [("arg_name", "Prompt text")]
"""
def input_with_default(prompt, default, longest):
full_prompt = f" >> {prompt:{longest}}"
sys.stdout.write(full_prompt)
sys.stdout.flush()
checkmark = "\033[92m\u2714\033[0m"
# read raw input char-by-char
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
user_input = ""
while True:
ch = sys.stdin.read(1)
if ch in ("\n", "\r"): # ENTER
break
elif ch == "\x7f": # BACKSPACE
if user_input:
user_input = user_input[:-1]
sys.stdout.write("\b \b")
sys.stdout.flush()
else:
user_input += ch
sys.stdout.write(ch)
sys.stdout.flush()
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
if not user_input:
user_input = default
# rewrite final line cleanly
sys.stdout.write(f"\r{full_prompt}{user_input:10} {checkmark}\n")
sys.stdout.flush()
return user_input
longest = 0
for field, text in fields:
a = text + " (default= " + cur_config["PORTAINER_" + field.upper()] + ")"
@@ -238,7 +230,6 @@ def prompt_missing_args(args_in, defaults_in, fields):
if default is not None:
prompt_text = f"{text} (default={default}) : "
# value_in = input(prompt) or default
if field == "site":
commands = ["portainer", "port"]
elif field == "deploy_mode":
@@ -249,7 +240,13 @@ def prompt_missing_args(args_in, defaults_in, fields):
commands = por.endpoints_names
elif field == "stack":
if args.action == "create_stack":
commands = []
commands = [
'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana',
'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3',
'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nextcloud', 'nginx',
'node-red', 'octoprint', 'ollama', 'pihole', 'portainer-ce', 'rancher', 'registry',
'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub',
'wud', 'zabbix-server']
else:
commands = []
if por._debug:
@@ -326,7 +323,6 @@ def prompt_missing_args(args_in, defaults_in, fields):
return args
print(cur_config)
if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
# token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret")
@@ -364,7 +360,8 @@ if __name__ == "__main__":
args.action = actions[int(ans) - 1]
os.system("cls" if os.name == "nt" else "clear")
# Example: list endpoints
por = Portainer(defaults["site"], timeout=args.timeout)
por = Portainer(cur_config["PORTAINER_SITE"], timeout=args.timeout)
por.set_defaults(cur_config)
if args.debug:
por._debug = True
if args.action == "secrets":
@@ -452,8 +449,16 @@ if __name__ == "__main__":
sys.exit()
if args.action == "list_stacks":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID"),
],
)
por.print_stacks(args.endpoint_id)
print(json.dumps(por.all_data, indent=2))
# print(json.dumps(por.all_data, indent=2))
sys.exit()
if args.action == "list_containers":