Compare commits

...

3 Commits

Author SHA1 Message Date
04f5a059a4 build 2025-12-13 22:15:40 +01:00
24492e1ec9 build 2025-12-13 21:42:38 +01:00
54da7d2764 build 2025-12-13 19:32:29 +01:00
2 changed files with 278 additions and 43 deletions

294
port.py
View File

@@ -115,7 +115,7 @@ class Portainer:
self.get_site(site)
self.get_endpoints()
self.get_stacks()
self.get_containers()
self.refresh_in_containers()
def set_defaults(self, config):
'''Set default configuration from provided config dictionary.'''
@@ -169,7 +169,10 @@ class Portainer:
url = f"{self.base_url.rstrip('/')}{path}"
headers = {"X-API-Key": f"{self.token}"}
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
if resp.status_code != 200:
return resp.status_code
print(f"Error: {resp.status_code} - {resp.text}")
# resp.raise_for_status()
return resp.json()
def _api_post(self, path, json="", timeout=120):
@@ -229,7 +232,7 @@ class Portainer:
def get_stacks(self, endpoint_id="all", timeout=20):
'''Get a list of stacks for a specific endpoint or all endpoints.'''
if endpoint_id != "all":
endpoint_id = self.get_endpoint_id(endpoint_id)
endpoint_id = self.get_endpoint_id()
path = "/stacks"
stcks = []
stacks = self._api_get(path, timeout=timeout)
@@ -296,7 +299,7 @@ class Portainer:
def get_services(self, endpoint, timeout=30):
'''Get a list of services for a specific stack on an endpoint.'''
# print(json.dumps(self.all_data,indent=2))
path = f"/endpoints/{self.get_endpoint_id(endpoint)}/docker/services"
path = f"/endpoints/{self.get_endpoint_id()}/docker/services"
# print(path)
# path += f'?filters={{"label": ["com.docker.compose.project={stack}"]}}'
services = self._api_get(path, timeout=timeout)
@@ -309,78 +312,137 @@ class Portainer:
stats = self._api_get(path)
print(stats)
def get_endpoint_id(self, endpoint):
def get_endpoint_id(self):
'''Get endpoint ID from either ID or name input.'''
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return endpoint
if self._is_number(self.args.endpoint_id):
self.endpoint_id = self.args.endpoint_id
self.endpoint_name = self.endpoints["by_id"][self.args.endpoint_id]
return self.args.endpoint_id
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
return self.endpoints["by_name"][endpoint]
self.endpoint_name = self.args.endpoint_id
self.endpoint_id = self.endpoints["by_name"][self.args.endpoint_id]
return self.endpoints["by_name"][self.args.endpoint_id]
def get_endpoint_name(self, endpoint):
'''Get endpoint name from either ID or name input.'''
if self._is_number(endpoint):
self.endpoint_id = endpoint
self.endpoint_name = self.endpoints["by_id"][endpoint]
return self.endpoints["by_id"][endpoint]
self.endpoint_name = self.all_data["endpoints"]["by_id"][endpoint]
return self.all_data["endpoints"]["by_id"][endpoint]
else:
self.endpoint_name = endpoint
self.endpoint_id = self.endpoints["by_name"][endpoint]
self.endpoint_id = self.all_data["endpoints"]["by_name"][endpoint]
return endpoint
def get_containers(self, endpoint="all", stack="all", timeout=30):
def refresh_in_containers(self):
print("Refreshing containers")
'''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
if endpoint == "all":
for s in self.all_data["endpoints"]["by_id"]:
# print(s)
if stack == "all":
if s not in self.all_data["stacks"]:
continue
if self.all_data["endpoints_status"][s] != 1:
eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()]
#input(eps)
for endpoint in eps:
if self.all_data["endpoints_status"][endpoint] != 1:
print("Endpoint down")
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
for e in self.all_data["stacks"][s]["by_name"]:
path = (
f"/endpoints/{s}/docker/containers/json"
f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
f"/endpoints/{endpoint}/docker/containers/json?all=1"
)
logging.info(f"request : {path}")
try:
containers = self._api_get(path)
#input(json.dumps(containers, indent=2))
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
#input(c)
cont.append([c["Names"][0].replace("/", ""),c["Id"], c['Image']])
contr.append([c["Names"][0].replace("/", ""), c["Id"], c['Image']])
if self.all_data["endpoints"]["by_id"][endpoint] in data:
data[self.all_data["endpoints"]["by_id"][endpoint]] = contr
data[endpoint] = contr
else:
data[self.all_data["endpoints"]["by_id"][endpoint]] = contr
data[endpoint] = contr
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}",
)
self.all_data["containers"] = data
#print(cont)
return cont
def get_containers(self):
'''Get a list of containers for a specific endpoint and stack.'''
# print(json.dumps(self.all_data,indent=2))
# print(endpoint)
# print(stack)
cont = []
data = {}
if self.args.endpoint_id == "all":
eps = [ep for ep in self.all_data['endpoints']['by_id'].keys()]
else:
eps = [self.get_endpoint_id()]
#input(eps)
for endpoint in eps:
# print(s)
#print(self.args.stack)
if self.args.stack in ["all", None]:
# input([id for id in self.all_data["stacks"][endpoint]['by_id'].keys()])
for s in [id for id in self.all_data["stacks"][endpoint]['by_id'].keys()]:
# if s not in self.all_data["stacks"]:
# continue
#input(self.all_data)
if self.all_data["endpoints_status"][endpoint] != 1:
# print(f"Endpoint {self.all_data["endpoints"]["by_id"][s]} is offline")
continue
# input(self.all_data["stacks"][endpoint]["by_name"])
for e in self.all_data["stacks"][endpoint]["by_name"]:
#input(e)
path = (
f"/endpoints/{endpoint}/docker/containers/json"
f'?all=1&filters={{"label": ["com.docker.compose.project={e}"]}}'
)
logging.info(f"request : {path}")
try:
containers = self._api_get(path)
#input(containers)
except Exception as e:
print(f"failed to get containers from {path}: {e}")
continue
contr = []
try:
for c in containers:
# input(c)
cont.append(c["Names"][0].replace("/", ""))
contr.append(c["Names"][0].replace("/", ""))
if self.all_data["endpoints"]["by_id"][s] in data:
data[self.all_data["endpoints"]["by_id"][s]][e] = contr
if self.all_data["endpoints"]["by_id"][endpoint] in data:
data[self.all_data["endpoints"]["by_id"][endpoint]][e] = contr
else:
data[self.all_data["endpoints"]["by_id"][s]] = {
data[self.all_data["endpoints"]["by_id"][endpoint]] = {
e: contr
}
except Exception as e:
logger.debug(
f"Exception while getting containers for stack {e} ",
f"on endpoint {self.all_data['endpoints']['by_id'][s]}: {e}",
f"on endpoint {self.all_data['endpoints']['by_id'][endpoint]}: {e}",
)
# print(data)
self.all_data["containers"] = data
else:
self.get_containers()
for i in self.all_data["containers"][endpoint][stack]:
cont.append(i)
self.all_data["containers"] = data
#print(cont)
return cont
def stop_containers(self, endpoint, containers, timeout=130):
@@ -541,7 +603,7 @@ class Portainer:
p = "standalone"
env_path = f"{self.repo_dir}/{stack}/.env"
# input(swarm_id)
self.endpoint_id = self.get_endpoint_id(endpoint)
self.endpoint_id = self.get_endpoint_id()
if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir)
else:
@@ -755,6 +817,7 @@ class Portainer:
data = []
stack_names = []
for stack in stacks:
print(stack)
if endpoint is not None:
if not stack["EndpointId"] in self.endpoints["by_id"]:
continue
@@ -775,14 +838,156 @@ class Portainer:
logger.debug(
"KeyError getting endpoint name for stack %s : %s", stack["Name"], e
)
count += 1
data = sorted(data, key=lambda x: x[1])
headers = ["StackID", "Name", "Endpoint"]
print(tabulate.tabulate(data, headers=headers, tablefmt="github"))
print(f"Total stacks: {count}")
# print(sorted(stack_names))
def update_containers(self):
all_containers = self.all_data["containers"][self.args.endpoint_id]
#input(all_containers)
service_tuples = [(s[1], s[0]) for s in all_containers if "." not in s[0]]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
pull = False
print("Checking for updates only...")
else:
print("Checking for updates and pulling updates...")
pull = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
# print(self.all_data["containers"][self.args.endpoint_id])
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.get_endpoint_id()}/containers/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
#print(resp)
if resp == 500:
print("?")
elif resp['Status'] == "outdated":
if pull:
self.recreate_container(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
self.gotify_message(f"Service {service_dict[service_id]} updated")
print(ok, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
else:
print(ok, end=" ")
for name, hash_, image in self.all_data["containers"][self.args.endpoint_id]:
if name.startswith(service_dict[service_id]):
print(image)
print("\033[?25h", end="")
return True
def update_service(self):
all_services = self.get_services(self.get_endpoint_id())
#input(all_services)
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
service_tuples = sorted(service_tuples, key=lambda x: x[1])
service_dict = dict(service_tuples)
# input(service_tuples)
if self.args.service_id is None:
#services = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
service_tuples.insert(0, ("__ALL__", "[Select ALL]"))
service_tuples.insert(0, ("__ONLY_CHECK__", "[Check Only]"))
service_ids = checkboxlist_dialog(
title="Select one service",
text="Choose a service:",
values=service_tuples
).run()
elif self.args.service_id == "all":
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
else:
service_ids = [self.args.service_id]
if "__ONLY_CHECK__" in service_ids or self.args.update is False:
pull = False
print("Checking for updates only...")
else:
print("Checking for updates and pulling updates...")
pull = True
if "__ALL__" in service_ids:
service_ids = [s[0] for s in service_tuples if s[0] != "__ALL__" and s[0] != "__ONLY_CHECK__"]
longest = 0
for a in service_dict.items():
# print(a[1])
if len(a[1]) > longest:
longest = len(a[1])
#print(longest)
ok = "\033[92m✔\033[0m"
err = "\033[91m✖\033[0m"
for service_id in service_ids:
print("\033[?25l", end="")
print(f"{service_dict[service_id]:<{longest}} ", end="", flush=True)
path = f"/docker/{self.endpoint_id}/services/{service_id}/image_status?refresh=true"
try:
resp = self._api_get(path, timeout=20)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
if resp['Status'] == "outdated":
if pull:
self.restart_srv(service_id, pull)
#print(f"Service {service_dict[service_id]:<{longest}} : updated")
self.gotify_message(f"Service {service_dict[service_id]} updated")
print(ok)
else:
print(f"\r\033[4m{service_dict[service_id]:<{longest}}\033[0m ", end="", flush=True)
#print(f"\033[4m{service_dict[service_id]:<{longest}} {err}\033[0m")
self.gotify_message(f"Service update available for {service_dict[service_id]}")
print(err)
else:
print(ok)
print("\033[?25h", end="")
return True
def update_service2(self):
all_services = self.get_services(self.get_endpoint_id(self.args.endpoint_id))
service_tuples = [(s['ID'], s['Spec']['Name']) for s in all_services]
@@ -846,6 +1051,18 @@ class Portainer:
print("\033[?25h", end="")
return True
def recreate_container(self,service_id, pull=False):
"""Restart a service on an endpoint."""
path = f"/endpoints/{self.endpoint_id}/containers/{service_id}/recreate"
print(path)
params={"pullImage": pull}
try:
resp = self._api_post(path, json=params, timeout=20)
print(resp)
except ValueError as e:
print(f"Error restarting service: {e}")
return []
def restart_srv(self,service_id, pool=False):
"""Restart a service on an endpoint."""
path = f"/endpoints/{self.endpoint_id}/forceupdateservice"
@@ -859,6 +1076,7 @@ class Portainer:
def restart_service(self, endpoint_id, service_id):
stacks = [(s["Id"], s["Name"]) for s in self.get_stacks(endpoint_id)]
stacks = sorted(stacks, key=lambda x: x[1])
stack_id = radiolist_dialog(
title="Select one service",
text="Choose a service:",

View File

@@ -132,7 +132,7 @@ parser.add_argument(
default=None,
help="Service ID to limit service operations",
)
parser.add_argument("--stack", "-s", type=str, nargs="+", help="Stack ID for operations")
parser.add_argument("--stack", "-s", type=str, default=None, nargs="+", help="Stack ID for operations")
parser.add_argument("--action", "-a", type=str, default=None, help="Action to perform")
parser.add_argument(
"--autostart", "-Z", action="store_true", help="Auto-start created stacks"
@@ -388,6 +388,7 @@ if __name__ == "__main__":
("start_stack","start_stack"),
("restart_service","restart_service"),
("update_service","update_service"),
("update_containers","update_containers"),
("list_stacks","list_stacks"),
("update_stack","update_stack"),
("secrets","secrets"),
@@ -474,6 +475,8 @@ if __name__ == "__main__":
if args.action == "create_stack":
por.action = "create_stack"
print(cur_config)
print(args)
args = prompt_missing_args(
args,
cur_config,
@@ -548,6 +551,22 @@ if __name__ == "__main__":
)
por.update_service()
sys.exit()
if args.action == "update_containers":
args = prompt_missing_args(
args,
cur_config,
[
("site", "Site"),
("endpoint_id", "Endpoint ID")
],
)
por.update_containers()
sys.exit()
if args.action == "list_stacks":
args = prompt_missing_args(
args,
@@ -563,12 +582,10 @@ if __name__ == "__main__":
if args.action == "list_containers":
print("Getting containers")
por.get_containers(args.endpoint_id, args.stack)
print(por.get_containers())
sys.exit()
if args.action == "update_stack":
print("Updating stacks")
por.update_stack(args.endpoint_id, args.stack, args.autostart)
@@ -581,7 +598,7 @@ if __name__ == "__main__":
sys.exit()
if args.action == "list_endpoints":
eps = por.get_endpoints()
eps = por.get_endpoints(args)
export_data = []
for i in eps["by_id"]:
export_data.append([i, eps["by_id"][i]])