Compare commits

...

16 Commits

Author SHA1 Message Date
04f5a059a4 build 2025-12-13 22:15:40 +01:00
24492e1ec9 build 2025-12-13 21:42:38 +01:00
54da7d2764 build 2025-12-13 19:32:29 +01:00
2d3ca53c08 build 2025-12-13 13:38:22 +01:00
87a088bfb0 build 2025-12-13 13:36:15 +01:00
92b24e472e Update .gitlab-ci.yml file 2025-12-13 13:35:22 +01:00
42f82ef69e build 2025-12-13 13:33:02 +01:00
506aa0a903 Update .gitlab-ci.yml file 2025-12-13 13:32:42 +01:00
6c4222ac16 build 2025-12-13 13:30:30 +01:00
9be4051720 build 2025-12-12 17:46:33 +01:00
2319a13554 Update .gitlab-ci.yml file 2025-12-12 17:45:00 +01:00
b434887ff9 build 2025-12-12 17:13:57 +01:00
4c8beff0c8 build 2025-12-12 15:58:03 +01:00
d6842eab62 build 2025-12-12 15:19:53 +01:00
5864085ec3 build 2025-12-12 10:48:09 +01:00
04248ce279 build 2025-12-12 10:39:06 +01:00
3 changed files with 404 additions and 145 deletions

View File

@@ -7,9 +7,9 @@ variables:
GIT_SSH_COMMAND: "ssh -i /home/gitlab-runner/.ssh/id_rsa -o IdentitiesOnly=yes" GIT_SSH_COMMAND: "ssh -i /home/gitlab-runner/.ssh/id_rsa -o IdentitiesOnly=yes"
lint: lint:
stage: lint stage: lint
# image: python:3.12 image: r.sectorq.eu/jaydee/builder-portainer:latest
before_script: before_script:
- python3 -m pip install --break-system-packages flake8 black pylint tabulate prompt_toolkit - python3 -m pip install --break-system-packages flake8 black pylint tabulate prompt_toolkit hvac
- export PATH="$PATH:/home/gitlab-runner/.local/bin" - export PATH="$PATH:/home/gitlab-runner/.local/bin"
# - echo "PATH is now: $PATH" # - echo "PATH is now: $PATH"
script: script:
@@ -27,9 +27,9 @@ build-job: # This job runs in the build stage, which runs first.
- echo "$SSH_PRIVATE_KEY" | tr -d '\r' > ~/.ssh/id_rsa - echo "$SSH_PRIVATE_KEY" | tr -d '\r' > ~/.ssh/id_rsa
- chmod 600 ~/.ssh/id_rsa - chmod 600 ~/.ssh/id_rsa
- pyinstaller --onefile portainer.py - pyinstaller --onefile portainer.py
- scp -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null dist/portainer jd@192.168.80.222:/myapps/bin/ || true #- scp -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null dist/portainer jd@192.168.80.222:/myapps/bin/ || true
- scp -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null dist/portainer jd@morefine.home.lan:/myapps/bin/ || true - scp -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null dist/portainer jd@192.168.77.12:/myapps/bin/ || true
- scp -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null dist/portainer jd@m-server.home.lan:/myapps/bin/ || true - scp -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null dist/portainer jd@192.168.77.101:/myapps/bin/ || true
- rm -rf /home/gitlab-runner/builds/1fLwHSKm2/0/jaydee/portainer.tmp - rm -rf /home/gitlab-runner/builds/1fLwHSKm2/0/jaydee/portainer.tmp
artifacts: artifacts:
paths: paths:

436
port.py
View File

@@ -12,6 +12,7 @@ import base64
import tabulate import tabulate
from git import Repo from git import Repo
import requests import requests
import hvac
from prompt_toolkit import prompt from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.shortcuts import checkboxlist_dialog from prompt_toolkit.shortcuts import checkboxlist_dialog
@@ -27,9 +28,10 @@ class Portainer:
to perform API operations. to perform API operations.
""" """
def __init__(self, site, timeout=10): def __init__(self, site, args=None, timeout=120):
self.base_url = None self.base_url = None
self.token = None self.token = None
self.args = args
self.action = None self.action = None
self._debug = False self._debug = False
self.timeout = timeout self.timeout = timeout
@@ -39,7 +41,7 @@ class Portainer:
self.stack_id = None self.stack_id = None
self.stack_ids = [] self.stack_ids = []
self.endpoint_name = None self.endpoint_name = None
self.endpoint_id = None self.endpoint_id = args.endpoint_id
# self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git" # self.git_url = "https://gitlab.sectorq.eu/home/docker-compose.git"
self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git" self.git_url = "git@gitlab.sectorq.eu:home/docker-compose.git"
@@ -113,9 +115,8 @@ class Portainer:
self.get_site(site) self.get_site(site)
self.get_endpoints() self.get_endpoints()
self.get_stacks() self.get_stacks()
self.get_containers() self.refresh_in_containers()
def set_defaults(self, config): def set_defaults(self, config):
'''Set default configuration from provided config dictionary.''' '''Set default configuration from provided config dictionary.'''
self.cur_config = config self.cur_config = config
@@ -125,14 +126,17 @@ class Portainer:
self.base_url = os.getenv( self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api" "PORTAINER_URL", "https://portainer.sectorq.eu/api"
) )
self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" # self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
token_path = "portainer/token"
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
elif site == "port": elif site == "port":
self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api") self.base_url = os.getenv("PORTAINER_URL", "https://port.sectorq.eu/api")
self.token = "ptr_/5RkMCT/j3BTaL32vMSDtXFi76yOXRKVFOrUtzMsl5Y=" token_path = "port/token"
self.token = self.args.client.secrets.kv.v2.read_secret_version(path=token_path)['data']['data']['value']
else: else:
self.base_url = os.getenv( self.base_url = os.getenv(
"PORTAINER_URL", "https://portainer.sectorq.eu/api" "PORTAINER_URL", "https://portainer.sectorq.eu/api"
) )
self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc=" self.token = "ptr_GCNUoFcTOaXm7k8ZxPdQGmrFIamxZPTydbserYofMHc="
self.get_endpoints() self.get_endpoints()
self.get_stacks() self.get_stacks()
@@ -145,11 +149,30 @@ class Portainer:
except ValueError: except ValueError:
return False return False
def gotify_message(self, message):
payload = {
"title": "Updates in Portainer",
"message": message,
"priority": 5
}
'''Send a notification message via Gotify.'''
response = requests.post(
"https://gotify.sectorq.eu/message",
data=payload,
headers={"X-Gotify-Key": "ASn_fIAd5OVjm8c"}
)
# print("Status:", response.status_code)
# print("Response:", response.text)
pass
def _api_get(self, path, timeout=120): def _api_get(self, path, timeout=120):
url = f"{self.base_url.rstrip('/')}{path}" url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"} headers = {"X-API-Key": f"{self.token}"}
resp = requests.get(url, headers=headers, timeout=timeout) resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status() if resp.status_code != 200:
return resp.status_code
print(f"Error: {resp.status_code} - {resp.text}")
# resp.raise_for_status()
return resp.json() return resp.json()
def _api_post(self, path, json="", timeout=120): def _api_post(self, path, json="", timeout=120):
@@ -209,7 +232,7 @@ class Portainer:
def get_stacks(self, endpoint_id="all", timeout=20): def get_stacks(self, endpoint_id="all", timeout=20):
'''Get a list of stacks for a specific endpoint or all endpoints.''' '''Get a list of stacks for a specific endpoint or all endpoints.'''
if endpoint_id != "all": if endpoint_id != "all":
endpoint_id = self.get_endpoint_id(endpoint_id) endpoint_id = self.get_endpoint_id()
path = "/stacks" path = "/stacks"
stcks = [] stcks = []
stacks = self._api_get(path, timeout=timeout) stacks = self._api_get(path, timeout=timeout)
@@ -275,13 +298,13 @@ class Portainer:
def get_services(self, endpoint, timeout=30): def get_services(self, endpoint, timeout=30):
'''Get a list of services for a specific stack on an endpoint.''' '''Get a list of services for a specific stack on an endpoint.'''
print(json.dumps(self.all_data,indent=2)) # print(json.dumps(self.all_data,indent=2))
path = f"/endpoints/{endpoint}/docker/services" path = f"/endpoints/{self.get_endpoint_id()}/docker/services"
print(path) # print(path)
#path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}' # path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}'
services = self._api_get(path, timeout=timeout) services = self._api_get(path, timeout=timeout)
return services return services
def update_status(self, endpoint, stack): def update_status(self, endpoint, stack):
'''Get the update status of a specific stack on an endpoint.''' '''Get the update status of a specific stack on an endpoint.'''
path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true" path = f"/stacks/{self.all_data['stacks'][endpoint]['by_name'][stack]}/images_status?refresh=true"
@@ -289,78 +312,137 @@ class Portainer:
stats = self._api_get(path) stats = self._api_get(path)
print(stats) print(stats)
def get_endpoint_id(self, endpoint): def get_endpoint_id(self):
'''Get endpoint ID from either ID or name input.''' '''Get endpoint ID from either ID or name input.'''
if self._is_number(endpoint): if self._is_number(self.args.endpoint_id):
self.endpoint_id = endpoint self.endpoint_id = self.args.endpoint_id
self.endpoint_name = self.endpoints["by_id"][endpoint] self.endpoint_name = self.endpoints["by_id"][self.args.endpoint_id]
return endpoint return self.args.endpoint_id
else: else:
self.endpoint_name = endpoint self.endpoint_name = self.args.endpoint_id
self.endpoint_id = self.endpoints["by_name"][endpoint] self.endpoint_id = self.endpoints["by_name"][self.args.endpoint_id]
return self.endpoints["by_name"][endpoint] return self.endpoints["by_name"][self.args.endpoint_id]
def get_endpoint_name(self, endpoint): def get_endpoint_name(self, endpoint):
'''Get endpoint name from either ID or name input.''' '''Get endpoint name from either ID or name input.'''
if self._is_number(endpoint): if self._is_number(endpoint):
self.endpoint_id = endpoint self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint] self.endpoint_name = self.all_data["endpoints"]["by_id"][endpoint]
return self.endpoints["by_id"][endpoint] return self.all_data["endpoints"]["by_id"][endpoint]
else: else:
self.endpoint_name = endpoint self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint] self.endpoint_id = self.all_data["endpoints"]["by_name"][endpoint]
return endpoint return endpoint
def get_containers(self, endpoint="all", stack="all", timeout=30): def refresh_in_containers(self):
print("Refreshing containers")
'''Get a list of containers for a specific endpoint and stack.''' '''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2)) # print(json.dumps(self.all_data,indent=2))
# print(endpoint) # print(endpoint)
# print(stack) # print(stack)
cont = [] cont = []
data = {} data = {}
if endpoint == "all":
for s in self.all_data["endpoints"]["by_id"]: eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()]
# print(s) #input(eps)
if stack == "all": for endpoint in eps:
if s not in self.all_data["stacks"]: if self.all_data["endpoints_status"][endpoint] != 1:
continue print("Endpoint down")
if self.all_data["endpoints_status"][s] != 1: # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
path = (
f"/endpoints/{endpoint}/docker/containers/json?all=1"
)
logging.info(f"request : {path}")
try:
containers = self._api_get(path)
#input(json.dumps(containers, indent=2))
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
#input(c)
cont.append([c["Names"][0].replace("/", ""),c["Id"], c['Image']])
contr.append([c["Names"][0].replace("/", ""), c["Id"], c['Image']])
if self.all_data["endpoints"]["by_id"][endpoint] in data:
data[self.all_data["endpoints"]["by_id"][endpoint]] = contr
data[endpoint] = contr
else:
data[self.all_data["endpoints"]["by_id"][endpoint]] = contr
data[endpoint] = contr
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}",
)
self.all_data["containers"] = data
#print(cont)
return cont
def get_containers(self):
'''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
if self.args.endpoint_id == "all":
eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()]
else:
eps = [self.get_endpoint_id()]
#input(eps)
for endpoint in eps:
# print(s)
#print(self.args.stack)
if self.args.stack in ["all", None]:
# input([id for id in self.all_data["stacks"][endpoint]['by_id'].keys()])
for s in [id for id in self.all_data["stacks"][endpoint]['by_id'].keys()]:
# if s not in self.all_data["stacks"]:
# continue
#input(self.all_data)
if self.all_data["endpoints_status"][endpoint] != 1:
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline") # print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue continue
for e in self.all_data["stacks"][s]["by_name"]: # input(self.all_data["stacks"][endpoint]["by_name"])
for e in self.all_data["stacks"][endpoint]["by_name"]:
#input(e)
path = ( path = (
f"/endpoints/{s}/docker/containers/json" f"/endpoints/{endpoint}/docker/containers/json"
f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}' f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
) )
logging.info(f"request : {path}") logging.info(f"request : {path}")
try: try:
containers = self._api_get(path) containers = self._api_get(path)
#input(containers)
except Exception as e: except Exception as e:
print(f"failed to get containers from {path}: {e}") print(f"failed to get containers from {path}: {e}")
continue continue
contr = [] contr = []
try: try:
for c in containers: for c in containers:
# input(c)
cont.append(c["Names"][0].replace("/", "")) cont.append(c["Names"][0].replace("/", ""))
contr.append(c["Names"][0].replace("/", "")) contr.append(c["Names"][0].replace("/", ""))
if self.all_data["endpoints"]["by_id"][s] in data: if self.all_data["endpoints"]["by_id"][endpoint] in data:
data[self.all_data["endpoints"]["by_id"][s]][e] = contr data[self.all_data["endpoints"]["by_id"][endpoint]][e] = contr
else: else:
data[self.all_data["endpoints"]["by_id"][s]] = { data[self.all_data["endpoints"]["by_id"][endpoint]] = {
e: contr e: contr
} }
except Exception as e: except Exception as e:
logger.debug( logger.debug(
f"Exception while getting containers for stack {e} ", f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}", f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}",
) )
# print(data)
self.all_data["containers"] = data
else:
self.get_containers()
for i in self.all_data["containers"][endpoint][stack]: self.all_data["containers"] = data
cont.append(i)
#print(cont)
return cont return cont
def stop_containers(self, endpoint, containers, timeout=130): def stop_containers(self, endpoint, containers, timeout=130):
@@ -399,7 +481,11 @@ class Portainer:
for s in self.all_data["webhooks"][endpoint]: for s in self.all_data["webhooks"][endpoint]:
stcs.append([s, self.all_data["webhooks"][endpoint][s]]) stcs.append([s, self.all_data["webhooks"][endpoint][s]])
else: else:
stcs.append([stack, self.all_data["webhooks"][endpoint][stack]]) try:
stcs.append([stack, self.all_data["webhooks"][endpoint][stack]])
except Exception as e:
print(f"Error: Stack {stack} not found on endpoint {endpoint}: {e}")
# input(stcs) # input(stcs)
def update(c): def update(c):
@@ -517,7 +603,7 @@ class Portainer:
p = "standalone" p = "standalone"
env_path = f"{self.repo_dir}/{stack}/.env" env_path = f"{self.repo_dir}/{stack}/.env"
# input(swarm_id) # input(swarm_id)
self.endpoint_id = self.get_endpoint_id(endpoint) self.endpoint_id = self.get_endpoint_id()
if os.path.exists(self.repo_dir): if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir) shutil.rmtree(self.repo_dir)
else: else:
@@ -724,7 +810,6 @@ class Portainer:
} }
self._api_post_file(path, self.endpoint_id, stack, envs, file) self._api_post_file(path, self.endpoint_id, stack, envs, file)
def print_stacks(self, endpoint="all"): def print_stacks(self, endpoint="all"):
"""Print a table of stacks, optionally filtered by endpoint.""" """Print a table of stacks, optionally filtered by endpoint."""
stacks = self.get_stacks() stacks = self.get_stacks()
@@ -732,6 +817,7 @@ class Portainer:
data = [] data = []
stack_names = [] stack_names = []
for stack in stacks: for stack in stacks:
print(stack)
if endpoint is not None: if endpoint is not None:
if not stack["EndpointId"] in self.endpoints["by_id"]: if not stack["EndpointId"] in self.endpoints["by_id"]:
continue continue
@@ -752,65 +838,230 @@ class Portainer:
logger.debug( logger.debug(
"KeyError getting endpoint name for stack %s : %s", stack["Name"], e "KeyError getting endpoint name for stack %s : %s", stack["Name"], e
) )
count += 1
data = sorted(data, key=lambda x: x[1])
headers = ["StackID", "Name", "Endpoint"] headers = ["StackID", "Name", "Endpoint"]
print(tabulate.tabulate(data, headers=headers, tablefmt="github")) print(tabulate.tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}") print(f"Total stacks: {count}")
# print(sorted(stack_names)) # print(sorted(stack_names))
def update_service(self, endpoint_id=None, service_id=None): def update_containers(self):
all_services = self.get_services(self.get_endpoint_id(endpoint_id)) all_containers = self.all_data["containers"][self.args.endpoint_id]
#input(all_containers)
service_tuples = [(s[1], s[0]) for s in all_containers if "." not in s[0]]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
pull = False
print("Checking for updates only...")
else:
print("Checking for updates and pulling updates...")
pull = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
# print(self.all_data["containers"][self.args.endpoint_id])
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.get_endpoint_id()}/containers/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
#print(resp)
if resp == 500:
print("?")
elif resp['Status'] == "outdated":
if pull:
self.recreate_container(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
self.gotify_message(f"Service {service_dict[service_id]} updated")
print(ok, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
else:
print(ok, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
print("\033[?25h", end="")
return True
def update_service(self):
all_services = self.get_services(self.get_endpoint_id())
#input(all_services)
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
pull = False
print("Checking for updates only...")
else:
print("Checking for updates and pulling updates...")
pull = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
if resp['Status'] == "outdated":
if pull:
self.restart_srv(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
self.gotify_message(f"Service {service_dict[service_id]} updated")
print(ok)
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err)
else:
print(ok)
print("\033[?25h", end="")
return True
def update_service2(self):
all_services = self.get_services(self.get_endpoint_id(self.args.endpoint_id))
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services] service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
input(service_tuples) service_dict = dict(service_tuples)
if service_id == "all": # input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
pull = False
print("Checking for updates only...")
else:
print("Checking for updates and pulling updates...")
pull = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true"
service_id = self.all
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
services.insert(0, ("__ALL__", "[Select ALL]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=services
).run()
if service_ids == "__ALL__":
pass
print(service_ids)
service_dict = dict(service_ids)
services = self.get_services(self.endpoint_name, stack_id)
svc_name = service_dict.get(stack_id)
stack_svcs = []
svc_menu = []
for s in services:
try: try:
if svc_name in s['Spec']['Name']: resp = self._api_get(path, timeout=20)
stack_svcs.append([s['Version']['Index'], s['Spec']['Name']]) except ValueError as e:
svc_menu.append([s['ID'], s['Spec']['Name']]) print(f"Error restarting service: {e}")
except KeyError as e: return []
print(e)
if resp['Status'] == "outdated":
if pull:
service_id = radiolist_dialog( self.restart_srv(service_id, pull)
title="Select one service", #print(f"Service {service_dict[service_id]:<{longest}} : updated")
text="Choose a service:", self.gotify_message(f"Service {service_dict[service_id]} updated")
values=svc_menu print(ok)
).run() else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err)
else:
print(ok)
print("\033[?25h", end="")
return True
def recreate_container(self,service_id, pull=False):
"""Restart a service on an endpoint.""" """Restart a service on an endpoint."""
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true" path = f"/endpoints/{self.endpoint_id}/containers/{service_id}/recreate"
print(path)
params={"pullImage": pull}
try: try:
resp = self._api_get(path, timeout=20) resp = self._api_post(path, json=params, timeout=20)
print(resp)
except ValueError as e: except ValueError as e:
print(f"Error restarting service: {e}") print(f"Error restarting service: {e}")
return [] return []
if resp['Status'] == "outdated":
self.restart_srv(service_id, True)
print(f"Service {service_id} : restarted")
return True
def restart_srv(self,service_id, pool=False): def restart_srv(self,service_id, pool=False):
"""Restart a service on an endpoint.""" """Restart a service on an endpoint."""
@@ -825,6 +1076,7 @@ class Portainer:
def restart_service(self, endpoint_id, service_id): def restart_service(self, endpoint_id, service_id):
stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)] stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
stacks = sorted(stacks, key=lambda x: x[1])
stack_id = radiolist_dialog( stack_id = radiolist_dialog(
title="Select one service", title="Select one service",
text="Choose a service:", text="Choose a service:",

View File

@@ -14,6 +14,7 @@ import json
import argparse import argparse
import tty import tty
import termios import termios
import hvac
from tabulate import tabulate from tabulate import tabulate
from port import Portainer from port import Portainer
from prompt_toolkit import prompt from prompt_toolkit import prompt
@@ -21,7 +22,24 @@ from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.shortcuts import checkboxlist_dialog from prompt_toolkit.shortcuts import checkboxlist_dialog
from prompt_toolkit.shortcuts import radiolist_dialog from prompt_toolkit.shortcuts import radiolist_dialog
VERSION = "0.1.13" VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://192.168.77.101:8200")
try:
VAULT_TOKEN = os.environ.get("VAULT_TOKEN")
if VAULT_TOKEN is None:
raise KeyError
except KeyError:
VAULT_TOKEN = prompt("Valult root token : ", is_password=True)
os.environ["VAULT_TOKEN"] = VAULT_TOKEN
client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN)
# Check if connected
if client.is_authenticated():
print("Connected to Vault")
else:
raise Exception("Failed to authenticate with Vault")
# Specify the mount point of your KV engine
VERSION = "0.1.14"
defaults = { defaults = {
"endpoint_id": "vm01", "endpoint_id": "vm01",
@@ -34,8 +52,6 @@ defaults = {
cur_config = {} cur_config = {}
def load_config(defaults=defaults): def load_config(defaults=defaults):
'''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.''' '''Load configuration from /myapps/portainer.conf if it exists, else from env vars or defaults.'''
if os.path.exists("/myapps/portainer.conf"): if os.path.exists("/myapps/portainer.conf"):
@@ -71,7 +87,7 @@ def load_config(defaults=defaults):
cur_config = load_config(defaults) a = load_config(defaults)
# ENV_VARS = [ # ENV_VARS = [
# "PORTAINER_URL", # "PORTAINER_URL",
@@ -116,53 +132,21 @@ parser.add_argument(
default=None, default=None,
help="Service ID to limit service operations", help="Service ID to limit service operations",
) )
parser.add_argument( parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations")
"--refresh-environment", "-R", action="store_true", help="List endpoints"
)
parser.add_argument(
"--list-endpoints", "-E", action="store_true", help="List endpoints"
)
parser.add_argument("--list-stacks", "-l", action="store_true", help="List stacks")
parser.add_argument("--print-all-data", "-A", action="store_true", help="List stacks")
parser.add_argument(
"--list-containers", "-c", action="store_true", help="List containers"
)
parser.add_argument("--update-stack", "-U", action="store_true", help="Update stacks")
parser.add_argument(
"--stop-containers", "-O", action="store_true", help="Stop containers"
)
parser.add_argument(
"--start-containers", "-X", action="store_true", help="Start containers"
)
parser.add_argument("--update-status", "-S", action="store_true", help="Update status")
parser.add_argument(
"--get-stack", metavar="NAME_OR_ID", help="Get stack by name or numeric id"
)
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform") parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
parser.add_argument( parser.add_argument(
"--autostart", "-Z", action="store_true", help="Auto-start created stacks" "--autostart", "-Z", action="store_true", help="Auto-start created stacks"
) )
parser.add_argument("--start-stack", "-x", action="store_true") parser.add_argument("--update", "-u", action="store_true", help="Update service if it exists")
parser.add_argument("--stop-stack", "-o", action="store_true")
parser.add_argument("--secrets", "-q", action="store_true")
parser.add_argument("--debug", "-D", action="store_true") parser.add_argument("--debug", "-D", action="store_true")
parser.add_argument("--create-stack", "-n", action="store_true")
parser.add_argument("--create-stack_new2", "-N", action="store_true")
parser.add_argument("--gpu", "-g", action="store_true") parser.add_argument("--gpu", "-g", action="store_true")
parser.add_argument("--create-stacks", "-C", action="store_true")
parser.add_argument("--refresh-status", "-r", action="store_true")
parser.add_argument("--stack", "-s", type=str, nargs="+", help="Stack ID for operations")
parser.add_argument(
"--token-only", action="store_true", help="Print auth token and exit"
)
parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds") parser.add_argument("--timeout", type=int, default=10, help="Request timeout seconds")
parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode") parser.add_argument("--deploy-mode", "-m", type=str, default="git", help="Deploy mode")
parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode") parser.add_argument("--stack-mode", "-w", default=None, help="Stack mode")
args = parser.parse_args() args = parser.parse_args()
print("Running version:", VERSION) print("Running version:", VERSION)
print("Environment:", args.site) print("Environment:", args.site)
args.client = client
if args.site is not None: if args.site is not None:
cur_config["PORTAINER_SITE"] = args.site cur_config["PORTAINER_SITE"] = args.site
if args.endpoint_id is not None: if args.endpoint_id is not None:
@@ -235,10 +219,13 @@ def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None):
longest = len(a) longest = len(a)
for field, text in fields: for field, text in fields:
# print(field)
value_in = getattr(args_in, field) value_in = getattr(args_in, field)
default = defaults_in.get(f"PORTAINER_{field}".upper()) default = defaults_in.get(f"PORTAINER_{field}".upper())
cur_site = defaults_in.get("PORTAINER_SITE".upper()) cur_site = defaults_in.get("PORTAINER_SITE".upper())
cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper()) cur_env = defaults_in.get("PORTAINER_ENVIRONMENT_ID".upper())
# print(value_in)
if value_in is None: if value_in is None:
if default is not None: if default is not None:
prompt_text = f"{text} (default={default}) : " prompt_text = f"{text} (default={default}) : "
@@ -256,7 +243,7 @@ def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None):
# input(json.dumps(stacks, indent=2)) # input(json.dumps(stacks, indent=2))
commands = [ commands = [
'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana', 'authentik', 'bitwarden', 'bookstack', 'dockermon', 'fail2ban', 'gitea', 'gitlab', 'grafana',
'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3', 'hashicorp', 'home-assistant', 'homepage', 'immich', 'influxdb', 'jupyter', 'kestra', 'mailu3',
'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx', 'mealie', 'mediacenter', 'mosquitto', 'motioneye', 'n8n', 'nebula', 'nextcloud', 'nginx',
'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry', 'node-red', 'octoprint', 'ollama', 'onlyoffice', 'paperless-ngx', 'pihole', 'portainer-ce', 'rancher', 'registry',
'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress', 'regsync', 'semaphore', 'unifibrowser', 'uptime-kuma', 'watchtower', 'wazuh', 'webhub', 'wordpress',
@@ -383,13 +370,13 @@ def prompt_missing_args(args_in, defaults_in, fields, action=None,stacks=None):
return args return args
if __name__ == "__main__": if __name__ == "__main__":
# Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below. # Example usage: set PORTAINER_USER and PORTAINER_PASS in env, or pass literals below.
# token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret") # token = get_portainer_token(base,"admin","l4c1j4yd33Du5lo") # or get_portainer_token(base, "admin", "secret")
def signal_handler(sig, frame): def signal_handler(sig, frame):
logger.warning("Killed manually %s, %s", sig, frame) logger.warning("Killed manually %s, %s", sig, frame)
print("\nTerminated by user") print("\nTerminated by user")
print("\033[?25h", end="")
sys.exit(0) sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
os.system("cls" if os.name == "nt" else "clear") os.system("cls" if os.name == "nt" else "clear")
@@ -401,6 +388,7 @@ if __name__ == "__main__":
("start_stack","start_stack"), ("start_stack","start_stack"),
("restart_service","restart_service"), ("restart_service","restart_service"),
("update_service","update_service"), ("update_service","update_service"),
("update_containers","update_containers"),
("list_stacks","list_stacks"), ("list_stacks","list_stacks"),
("update_stack","update_stack"), ("update_stack","update_stack"),
("secrets","secrets"), ("secrets","secrets"),
@@ -418,7 +406,7 @@ if __name__ == "__main__":
title="Select one service", title="Select one service",
text="Choose a service:", text="Choose a service:",
values=actions values=actions
).run() ).run()
@@ -438,7 +426,7 @@ if __name__ == "__main__":
os.system("cls" if os.name == "nt" else "clear") os.system("cls" if os.name == "nt" else "clear")
# Example: list endpoints # Example: list endpoints
por = Portainer(cur_config["PORTAINER_SITE"], timeout=args.timeout) por = Portainer(cur_config["PORTAINER_SITE"], args)
por.set_defaults(cur_config) por.set_defaults(cur_config)
if args.debug: if args.debug:
por._debug = True por._debug = True
@@ -487,6 +475,8 @@ if __name__ == "__main__":
if args.action == "create_stack": if args.action == "create_stack":
por.action = "create_stack" por.action = "create_stack"
print(cur_config)
print(args)
args = prompt_missing_args( args = prompt_missing_args(
args, args,
cur_config, cur_config,
@@ -548,6 +538,9 @@ if __name__ == "__main__":
sys.exit() sys.exit()
if args.action == "update_service": if args.action == "update_service":
args = prompt_missing_args( args = prompt_missing_args(
args, args,
cur_config, cur_config,
@@ -556,8 +549,24 @@ if __name__ == "__main__":
("endpoint_id", "Endpoint ID") ("endpoint_id", "Endpoint ID")
], ],
) )
por.update_service(args.endpoint_id, args.service_id) por.update_service()
sys.exit() sys.exit()
if args.action == "update_containers":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_containers()
sys.exit()
if args.action == "list_stacks": if args.action == "list_stacks":
args = prompt_missing_args( args = prompt_missing_args(
args, args,
@@ -573,12 +582,10 @@ if __name__ == "__main__":
if args.action == "list_containers": if args.action == "list_containers":
print("Getting containers") print("Getting containers")
por.get_containers(args.endpoint_id, args.stack) print(por.get_containers())
sys.exit() sys.exit()
if args.action == "update_stack": if args.action == "update_stack":
print("Updating stacks") print("Updating stacks")
por.update_stack(args.endpoint_id, args.stack, args.autostart) por.update_stack(args.endpoint_id, args.stack, args.autostart)
@@ -591,7 +598,7 @@ if __name__ == "__main__":
sys.exit() sys.exit()
if args.action == "list_endpoints": if args.action == "list_endpoints":
eps = por.get_endpoints() eps = por.get_endpoints(args)
export_data = [] export_data = []
for i in eps["by_id"]: for i in eps["by_id"]:
export_data.append([i, eps["by_id"][i]]) export_data.append([i, eps["by_id"][i]])