diff --git a/port.py b/port.py index cbfdf17..8facc04 100644 --- a/port.py +++ b/port.py @@ -20,8 +20,9 @@ class Portainer: Instantiate with base_url and optional token/timeout and call methods to perform API operations. """ + def __init__(self, base_url, token, timeout=10): - self.base_url = base_url.rstrip('/') + self.base_url = base_url.rstrip("/") self.token = token self.timeout = timeout self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" @@ -43,7 +44,7 @@ class Portainer: "bitwarden", "mailu3", "home-assistant", - "homepage" + "homepage", ] self.nas_stacks = self.basic_stacks + [ "gitlab", @@ -54,42 +55,38 @@ class Portainer: "immich", "jupyter", "kestra", - "mealie" + "mealie", ] self.m_server_stacks = self.basic_stacks + [ - 'immich', - 'zabbix-server', - 'gitea', - 'unifibrowser', - 'mediacenter', - 'watchtower', - 'wazuh', - 'octoprint', - 'motioneye', - 'kestra', - 'bookstack', - 'wud', - 'uptime-kuma', - 'registry', - 'regsync', - 'dockermon', - 'grafana', - 'nextcloud', - 'semaphore', - 'node-red', - 'test', - 'jupyter', - 'paperless', - 'mealie', - 'n8n', - 'ollama', - 'rancher' - ] - self.rpi5_stacks = self.basic_stacks + [ - "gitlab", + "immich", + "zabbix-server", + "gitea", + "unifibrowser", + "mediacenter", + "watchtower", + "wazuh", + "octoprint", + "motioneye", + "kestra", "bookstack", - "gitea" + "wud", + "uptime-kuma", + "registry", + "regsync", + "dockermon", + "grafana", + "nextcloud", + "semaphore", + "node-red", + "test", + "jupyter", + "paperless", + "mealie", + "n8n", + "ollama", + "rancher", ] + self.rpi5_stacks = self.basic_stacks + ["gitlab", "bookstack", "gitea"] self.rack_stacks = self.basic_stacks + [ "gitlab", "bookstack", @@ -99,7 +96,7 @@ class Portainer: "immich", "jupyter", "kestra", - "mealie" + "mealie", ] self.log_mode = False self.hw_mode = False @@ -136,13 +133,11 @@ class Portainer: """Example authenticated GET request to Portainer API.""" url = f"{self.base_url.rstrip('/')}{path}" headers = {"X-API-Key": f"{self.token}"} - data = { - "EndpointId": endpoint_id, - "Name": name, - "Env": json.dumps(envs) - } + data = {"EndpointId": endpoint_id, "Name": name, "Env": json.dumps(envs)} # print(data) - resp = requests.post(url, headers=headers, files=file, data=data, timeout=timeout) + resp = requests.post( + url, headers=headers, files=file, data=data, timeout=timeout + ) resp.raise_for_status() return resp.json() @@ -182,36 +177,49 @@ class Portainer: for s in stacks: # print(type(s["AutoUpdate"]) ) # input(s) - if s['EndpointId'] in fail_endponts: + if s["EndpointId"] in fail_endponts: continue - if not s['EndpointId'] in webhooks: + if not s["EndpointId"] in webhooks: try: - webhooks[s['EndpointId']] = {"webhook": {}} - webhooks[self.endpoints["by_id"][s['EndpointId']]] = {"webhook": {}} + webhooks[s["EndpointId"]] = {"webhook": {}} + webhooks[self.endpoints["by_id"][s["EndpointId"]]] = {"webhook": {}} except Exception as e: - logger.debug(f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}") - if not s['EndpointId'] in self.stacks_all: - self.stacks_all[s['EndpointId']] = {"by_id": {}, "by_name": {}} - self.stacks_all[self.endpoints["by_id"][s['EndpointId']]] = {"by_id": {}, "by_name": {}} - self.stacks_all[s['EndpointId']]["by_id"][s['Id']] = s['Name'] - self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_id"][s['Id']] = s['Name'] + logger.debug( + f"Exception while getting webhooks for endpoint {s['EndpointId']}: {e}" + ) + if not s["EndpointId"] in self.stacks_all: + self.stacks_all[s["EndpointId"]] = {"by_id": {}, "by_name": {}} + self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]] = { + "by_id": {}, + "by_name": {}, + } + self.stacks_all[s["EndpointId"]]["by_id"][s["Id"]] = s["Name"] + self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_id"][ + s["Id"] + ] = s["Name"] - self.stacks_all[s['EndpointId']]["by_name"][s['Name']] = s['Id'] - self.stacks_all[self.endpoints["by_id"][s['EndpointId']]]["by_name"][s['Name']] = s['Id'] + self.stacks_all[s["EndpointId"]]["by_name"][s["Name"]] = s["Id"] + self.stacks_all[self.endpoints["by_id"][s["EndpointId"]]]["by_name"][ + s["Name"] + ] = s["Id"] # print(s) if "AutoUpdate" in s and s["AutoUpdate"] is not None: if type(s["AutoUpdate"]) is dict and "Webhook" in s["AutoUpdate"]: # print(self.endpoints["by_id"][s['EndpointId']], s['Name'], s["AutoUpdate"]['Webhook']) # print("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW") - webhooks[s['EndpointId']][s['Name']] = s['AutoUpdate']['Webhook'] - webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['AutoUpdate']['Webhook'] + webhooks[s["EndpointId"]][s["Name"]] = s["AutoUpdate"]["Webhook"] + webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ + "AutoUpdate" + ]["Webhook"] elif s["AutoUpdate"]["Webhook"] != "": - webhooks[s['EndpointId']][s['Name']] = s['Webhook'] - webhooks[self.endpoints["by_id"][s['EndpointId']]][s['Name']] = s['Webhook'] + webhooks[s["EndpointId"]][s["Name"]] = s["Webhook"] + webhooks[self.endpoints["by_id"][s["EndpointId"]]][s["Name"]] = s[ + "Webhook" + ] # print(self.stacks_all) - if s['EndpointId'] == endpoint_id or endpoint_id == "all": + if s["EndpointId"] == endpoint_id or endpoint_id == "all": stcks.append(s) # print(stcks) if stcks is None: @@ -269,7 +277,7 @@ class Portainer: for e in self.all_data["stacks"][s]["by_name"]: path = ( f"/endpoints/{s}/docker/containers/json" - f"?all=1&filters={{\"label\": [\"com.docker.compose.project={e}\"]}}" + f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}' ) logging.info(f"request : {path}") try: @@ -285,11 +293,13 @@ class Portainer: if self.all_data["endpoints"]["by_id"][s] in data: data[self.all_data["endpoints"]["by_id"][s]][e] = contr else: - data[self.all_data["endpoints"]["by_id"][s]] = {e: contr} + data[self.all_data["endpoints"]["by_id"][s]] = { + e: contr + } except Exception as e: logger.debug( f"Exception while getting containers for stack {e} ", - f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}" + f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}", ) # print(data) self.all_data["containers"] = data @@ -307,10 +317,9 @@ class Portainer: def stop(c): print(f" > Stopping {c}") - self.api_post_no_body( - f"/endpoints/{ep_id}/docker/containers/{c}/stop" - ) + self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/stop") # print(f"✔") + with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) # for c in containers: @@ -323,9 +332,8 @@ class Portainer: def stop(c): print(f" > Starting {c}") - self.api_post_no_body( - f"/endpoints/{ep_id}/docker/containers/{c}/start" - ) + self.api_post_no_body(f"/endpoints/{ep_id}/docker/containers/{c}/start") + with ThreadPoolExecutor(max_workers=10) as exe: exe.map(stop, containers) @@ -340,10 +348,10 @@ class Portainer: # input(stcs) def update(c): print(f" > Updating {c[0]} on {endpoint}") - ans = self.api_post_no_body( - f"/stacks/webhooks/{c[1]}" + ans = self.api_post_no_body(f"/stacks/webhooks/{c[1]}") + logger.debug( + f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}" ) - logger.debug(f"Update response for stack {c[0]} on endpoint {endpoint}: {ans}") def stop(): cont = [] @@ -368,10 +376,10 @@ class Portainer: eps = {"by_id": {}, "by_name": {}} eps_stats = {} for ep in endpoints: - eps['by_id'][ep['Id']] = ep['Name'] - eps['by_name'][ep['Name']] = ep['Id'] - eps_stats[ep['Id']] = ep['Status'] - eps_stats[ep['Name']] = ep['Status'] + eps["by_id"][ep["Id"]] = ep["Name"] + eps["by_name"][ep["Name"]] = ep["Id"] + eps_stats[ep["Id"]] = ep["Status"] + eps_stats[ep["Name"]] = ep["Status"] self.endpoints = eps self.all_data["endpoints"] = eps self.all_data["endpoints_status"] = eps_stats @@ -394,7 +402,7 @@ class Portainer: ep_id = self.endpoints["by_name"][endpoint] path = f"/endpoints/{ep_id}/docker/info" stats = self.api_get(path) - return stats['Swarm']['Cluster']['ID'] + return stats["Swarm"]["Cluster"]["ID"] def get_stack(self, stack=None, endpoint_id=None, timeout=None): self.get_stacks(endpoint_id) @@ -404,7 +412,7 @@ class Portainer: if stack == "all": for s in self.stacks: # print(s) - if (endpoint_id == s.get("EndpointId")): + if endpoint_id == s.get("EndpointId"): self.stack_ids.append(s.get("Id")) return self.stack_ids else: @@ -416,10 +424,9 @@ class Portainer: and endpoint_id == s.get("EndpointId") ) - match_by_name = ( - str(s.get("Name")) == str(stack) - and endpoint_id == int(s.get("EndpointId")) # Ensure types match for comparison - ) + match_by_name = str(s.get("Name")) == str(stack) and endpoint_id == int( + s.get("EndpointId") + ) # Ensure types match for comparison if match_by_id or match_by_name: # if (stack is not None and s.get("Id") == stack and endpoint_id == s.get("EndpointId")) @@ -431,7 +438,15 @@ class Portainer: raise ValueError(f"Stack not found: {stack}") - def create_stack(self, endpoint, stack=None, mode="git", autostart=False, swarm=False, timeout=None): + def create_stack( + self, + endpoint, + stack=None, + mode="git", + autostart=False, + swarm=False, + timeout=None, + ): if swarm: swarm_id = self.get_swarm_id(endpoint) p = "swarm" @@ -469,8 +484,8 @@ class Portainer: # Check if the stack exists by ID or name stack_check = ( - stack in self.stacks_all[self.endpoint_id]['by_id'] - or stack in self.stacks_all[self.endpoint_id]['by_name'] + stack in self.stacks_all[self.endpoint_id]["by_id"] + or stack in self.stacks_all[self.endpoint_id]["by_name"] ) if stack_check: print(f"Stack {stack} already exist") @@ -488,24 +503,24 @@ class Portainer: name, value = ev.split("=", 1) envs.append({"name": name, "value": value}) f.close() - # wl(envs) + # wl(envs) for e in envs: # print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] - if e['name'] == "RESTART" and self.endpoint_name == "m-server": - e['value'] = "always" - if e['name'] in HWS: + if e["name"] == "RESTART" and self.endpoint_name == "m-server": + e["value"] = "always" + if e["name"] in HWS: # print("Found HW_MODE env var.") if self.hw_mode: - e['value'] = "hw" + e["value"] = "hw" else: - e['value'] = "cpu" - if e['name'] == "LOGGING": + e["value"] = "cpu" + if e["name"] == "LOGGING": # print("Found LOGGING env var.") if self.log_mode: - e['value'] = "journald" + e["value"] = "journald" else: - e['value'] = "syslog" + e["value"] = "syslog" uid = uuid.uuid4() # print(uid) @@ -516,7 +531,7 @@ class Portainer: "AutoUpdate": { "forcePullImage": True, "forceUpdate": False, - "webhook": f"{uid}" + "webhook": f"{uid}", }, "repositoryURL": "https://gitlab.sectorq.eu/home/docker-compose.git", "ReferenceName": "refs/heads/main", @@ -539,7 +554,7 @@ class Portainer: "RegistryID": 4, "isDetachedFromGit": True, "method": "repository", - "swarmID": None + "swarmID": None, } if swarm: req["type"] = "swarm" @@ -564,11 +579,16 @@ class Portainer: created = True break except Exception as e: - print(f"Waiting for stack {stack} to be created...{tries}/50", end="\r") + print( + f"Waiting for stack {stack} to be created...{tries}/50", + end="\r", + ) time.sleep(10) tries += 1 if tries > 50: - print(f"Error retrieving stack {stack} after creation: {self.endpoint_name}") + print( + f"Error retrieving stack {stack} after creation: {self.endpoint_name}" + ) break logger.debug(f"Exception while getting stack {stack}: {e}") @@ -611,28 +631,31 @@ class Portainer: name, value = ev.split("=", 1) envs.append({"name": name, "value": value}) f.close() - # wl(envs) + # wl(envs) for e in envs: # print(f"Env: {e['name']} = {e['value']}") HWS = ["HW_MODE", "HW_MODE1", "HW_MODE2"] - if e['name'] == "RESTART" and self.endpoint_name == "m-server": - e['value'] = "always" - if e['name'] in HWS: + if e["name"] == "RESTART" and self.endpoint_name == "m-server": + e["value"] = "always" + if e["name"] in HWS: print("Found HW_MODE env var.") if self.hw_mode: - e['value'] = "hw" + e["value"] = "hw" else: - e['value'] = "cpu" - if e['name'] == "LOGGING": + e["value"] = "cpu" + if e["name"] == "LOGGING": print("Found LOGGING env var.") if self.log_mode: - e['value'] = "journald" + e["value"] = "journald" else: - e['value'] = "syslog" + e["value"] = "syslog" file = { # ("filename", file_object) - "file": ("docker-compose.yml", open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb")), + "file": ( + "docker-compose.yml", + open(f"/tmp/docker-compose/{stack}/docker-compose.yml", "rb"), + ), } self.api_post_file(path, self.endpoint_id, stack, envs, file) @@ -642,16 +665,24 @@ class Portainer: data = [] for stack in stacks: if endpoint is not None: - if not stack['EndpointId'] in self.endpoints['by_id']: + if not stack["EndpointId"] in self.endpoints["by_id"]: continue if endpoint != "all": - if self.endpoints['by_name'][endpoint] != stack['EndpointId']: + if self.endpoints["by_name"][endpoint] != stack["EndpointId"]: continue try: - data.append([stack['Id'], stack['Name'], self.endpoints['by_id'][stack['EndpointId']]]) + data.append( + [ + stack["Id"], + stack["Name"], + self.endpoints["by_id"][stack["EndpointId"]], + ] + ) except KeyError as e: - data.append([stack['Id'], stack['Name'], "?"]) - logger.debug(f"KeyError getting endpoint name for stack {stack['Name']}: {e}") + data.append([stack["Id"], stack["Name"], "?"]) + logger.debug( + f"KeyError getting endpoint name for stack {stack['Name']}: {e}" + ) count += 1 headers = ["StackID", "Name", "Endpoint"] @@ -674,9 +705,13 @@ class Portainer: print(f"Error stoping stack: {e}") return [] if "Id" in json.loads(resp): - print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started") + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : started" + ) else: - print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}") + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}" + ) return True def stop_stack(self, stack, endpoint_id): @@ -698,9 +733,13 @@ class Portainer: print(f"Error stopping stack: {e}") return [] if "Id" in json.loads(resp): - print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped") + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : stopped" + ) else: - print(f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}") + print( + f"Stack {self.stacks_all[self.endpoint_id]['by_id'][stack]} : {json.loads(resp)['message']}" + ) return True def delete_stack(self, endpoint_id=None, stack=None, timeout=None): @@ -728,13 +767,13 @@ class Portainer: for s in stacks: # print(f"Delete stack {s['Name']}") # print(s['EndpointId'], endpoint_id) - if int(s['EndpointId']) != int(endpoint_id): + if int(s["EndpointId"]) != int(endpoint_id): continue # print("Deleting stack:", s['Name']) path = f"/stacks/{s['Id']}" if endpoint_id is not None: path += f"?endpointId={endpoint_id}&removeVolumes=true" - paths.append([self.get_endpoint_name(endpoint_id), s['Name'], path]) + paths.append([self.get_endpoint_name(endpoint_id), s["Name"], path]) # input(paths) def delete(c): @@ -779,8 +818,5 @@ class Portainer: endpoint_id = int(self.endpoints["by_name"][endpoint_id]) path = f"/endpoints/{endpoint_id}/docker/secrets/create" encoded = base64.b64encode(value.encode()).decode() - data = { - "Name": name, - "Data": encoded - } + data = {"Name": name, "Data": encoded} self.api_post(path, data, timeout=timeout) diff --git a/portainer.py b/portainer.py index c7491c7..753102a 100755 --- a/portainer.py +++ b/portainer.py @@ -7,6 +7,7 @@ import argparse from tabulate import tabulate from port import Portainer import logging + VERSION = "0.0.1" defaults = { @@ -15,42 +16,67 @@ defaults = { "deploy_mode": "git", "autostart": True, "stack_mode": "swarm", - "site": "portainer" + "site": "portainer", } -parser = argparse.ArgumentParser(description="Portainer helper - use env vars or pass credentials.") -parser.add_argument("--base", - "-b", - default=os.getenv("PORTAINER_URL", "https://portainer.example.com"), - help="Base URL for Portainer (ENV: PORTAINER_URL)" - ) +parser = argparse.ArgumentParser( + description="Portainer helper - use env vars or pass credentials." +) +parser.add_argument( + "--base", + "-b", + default=os.getenv("PORTAINER_URL", "https://portainer.example.com"), + help="Base URL for Portainer (ENV: PORTAINER_URL)", +) parser.add_argument("--site", "-t", type=str, default=None, help="Site") -parser.add_argument("--endpoint-id", "-e", type=str, default=None, help="Endpoint ID to limit stack operations") -parser.add_argument("--refresh-environment", "-R", action="store_true", help="List endpoints") -parser.add_argument("--list-endpoints", "-E", action="store_true", help="List endpoints") +parser.add_argument( + "--endpoint-id", + "-e", + type=str, + default=None, + help="Endpoint ID to limit stack operations", +) +parser.add_argument( + "--refresh-environment", "-R", action="store_true", help="List endpoints" +) +parser.add_argument( + "--list-endpoints", "-E", action="store_true", help="List endpoints" +) parser.add_argument("--list-stacks", "-l", action="store_true", help="List stacks") parser.add_argument("--print-all-data", "-A", action="store_true", help="List stacks") -parser.add_argument("--list-containers", "-c", action="store_true", help="List containers") +parser.add_argument( + "--list-containers", "-c", action="store_true", help="List containers" +) parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacks") -parser.add_argument("--stop-containers", "-O", action="store_true", help="Stop containers") -parser.add_argument("--start-containers", "-X", action="store_true", help="Start containers") +parser.add_argument( + "--stop-containers", "-O", action="store_true", help="Stop containers" +) +parser.add_argument( + "--start-containers", "-X", action="store_true", help="Start containers" +) parser.add_argument("--update-status", "-S", action="store_true", help="Update status") -parser.add_argument("--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id") +parser.add_argument( + "--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id" +) parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform") -parser.add_argument("--autostart", "-Z", action="store_true", help="Auto-start created stacks") -parser.add_argument("--start-stack", "-x", action='store_true') -parser.add_argument("--stop-stack", "-o", action='store_true') -parser.add_argument("--secrets", "-q", action='store_true') -parser.add_argument("--debug", "-D", action='store_true') -parser.add_argument("--create-stack", "-n", action='store_true') -parser.add_argument("--create-stack_new2", "-N", action='store_true') -parser.add_argument("--gpu", "-g", action='store_true') -parser.add_argument("--create-stacks", "-C", action='store_true') -parser.add_argument("--refresh-status", "-r", action='store_true') +parser.add_argument( + "--autostart", "-Z", action="store_true", help="Auto-start created stacks" +) +parser.add_argument("--start-stack", "-x", action="store_true") +parser.add_argument("--stop-stack", "-o", action="store_true") +parser.add_argument("--secrets", "-q", action="store_true") +parser.add_argument("--debug", "-D", action="store_true") +parser.add_argument("--create-stack", "-n", action="store_true") +parser.add_argument("--create-stack_new2", "-N", action="store_true") +parser.add_argument("--gpu", "-g", action="store_true") +parser.add_argument("--create-stacks", "-C", action="store_true") +parser.add_argument("--refresh-status", "-r", action="store_true") parser.add_argument("--stack", "-s", type=str, help="Stack ID for operations") -parser.add_argument("--token-only", action="store_true", help="Print auth token and exit") +parser.add_argument( + "--token-only", action="store_true", help="Print auth token and exit" +) parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds") parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode") parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode") @@ -64,32 +90,32 @@ if _LOG_LEVEL == "DEBUG": logging.basicConfig( filename=LOG_FILE, level=logging.DEBUG, - format='%(asctime)s : %(levelname)s : %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p' + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", ) - logging.debug('using debug logging') + logging.debug("using debug logging") elif _LOG_LEVEL == "ERROR": logging.basicConfig( filename=LOG_FILE, level=logging.ERROR, - format='%(asctime)s : %(levelname)s : %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p' + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", ) - logging.info('using error logging') + logging.info("using error logging") elif _LOG_LEVEL == "SCAN": logging.basicConfig( filename=LOG_FILE, level=logging.DEBUG, - format='%(asctime)s : %(levelname)s : %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p' + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", ) - logging.info('using scan logging') + logging.info("using scan logging") else: logging.basicConfig( filename=LOG_FILE, level=logging.INFO, - format='%(asctime)s : %(levelname)s : %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p' + format="%(asctime)s : %(levelname)s : %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", ) logging.info("script started") @@ -125,10 +151,14 @@ def get_portainer_token(base_url, username=None, password=None, timeout=10): username = username or os.getenv("PORTAINER_USER") password = password or os.getenv("PORTAINER_PASS") if not username or not password: - raise ValueError("Username and password must be provided (or set PORTAINER_USER / PORTAINER_PASS).") + raise ValueError( + "Username and password must be provided (or set PORTAINER_USER / PORTAINER_PASS)." + ) url = f"{base_url.rstrip('/')}/api/auth" - resp = requests.post(url, json={"Username": username, "Password": password}, timeout=timeout) + resp = requests.post( + url, json={"Username": username, "Password": password}, timeout=timeout + ) resp.raise_for_status() data = resp.json() token = data.get("jwt") or data.get("JWT") or data.get("token") @@ -161,28 +191,30 @@ if __name__ == "__main__": # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. # token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret") if args.action is None: - actions = ["delete_stack", - "create_stack", - "stop_stack", - "start_stack", - "list_stacks", - "update_stack", - "secrets", - "print_all_data", - "list_endpoints", - "list_containers", - "stop_containers", - "start_containers", - "refresh_environment", - "refresh_status", - "update_status"] + actions = [ + "delete_stack", + "create_stack", + "stop_stack", + "start_stack", + "list_stacks", + "update_stack", + "secrets", + "print_all_data", + "list_endpoints", + "list_containers", + "stop_containers", + "start_containers", + "refresh_environment", + "refresh_status", + "update_status", + ] print("Possible actions: ") i = 1 for a in actions: print(f" > {i}. {a}") i += 1 ans = input("\nSelect action to perform: ") - args.action = actions[int(ans)-1] + args.action = actions[int(ans) - 1] token = portainer_api_key # Example: list endpoints @@ -197,7 +229,7 @@ if __name__ == "__main__": "influxdb2-admin-token": "l4c1j4yd33Du5lo", "ha_influxdb2_admin_token": "l4c1j4yd33Du5lo", "wordpress_db_password": "wordpress", - "wordpress_root_db_password": "wordpress" + "wordpress_root_db_password": "wordpress", } for key, value in secrets.items(): res = por.create_secret(key, value, args.endpoint_id, args.timeout) @@ -205,23 +237,40 @@ if __name__ == "__main__": sys.exit() if args.action == "delete_stack": - args = prompt_missing_args(args, defaults, [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ("stack", "Stack name or ID") - ]) - por.delete_stack(args.endpoint_id, args.stack,) + args = prompt_missing_args( + args, + defaults, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ("stack", "Stack name or ID"), + ], + ) + por.delete_stack( + args.endpoint_id, + args.stack, + ) sys.exit() if args.action == "create_stack": - args = prompt_missing_args(args, defaults, [ - ("site", "Site"), - ("endpoint_id", "Endpoint ID"), - ("stack", "Stack name or ID"), - ("stack_mode", "Stack mode (swarm or compose)"), - ("deploy_mode", "Deploy mode (git or upload)") - ]) - por.create_stack(args.endpoint_id, args.stack, args.deploy_mode, args.autostart, args.stack_mode) + args = prompt_missing_args( + args, + defaults, + [ + ("site", "Site"), + ("endpoint_id", "Endpoint ID"), + ("stack", "Stack name or ID"), + ("stack_mode", "Stack mode (swarm or compose)"), + ("deploy_mode", "Deploy mode (git or upload)"), + ], + ) + por.create_stack( + args.endpoint_id, + args.stack, + args.deploy_mode, + args.autostart, + args.stack_mode, + ) sys.exit() if args.action == "stop_stack":