import yaml import sys import copy def default_deploy(): return { "mode": "replicated", "replicas": 1, "restart_policy": {"condition": "any"}, "labels": {}, "placement": { "constraints": [ "node.role == manager" ] } } def convert_service(service): swarm_service = {} # Create a fresh deploy section each time (avoids YAML anchors) deploy_section = default_deploy() for key, value in service.items(): #print(key, value) # Unsupported in Swarm # Move labels → deploy.labels #print(f"Labels: {deploy_section['labels']}") if key == "labels": input(f"Key: {key} Value: {value}") #print("Processing Labels:") if isinstance(value, dict): deploy_section["labels"].update({k: str(v).lower() for k, v in value.items()}) elif isinstance(value, list): for item in value: if "=" in item: k, v = item.split("=", 1) deploy_section["labels"][k] = str(v).lower() continue swarm_service[key] = value envir = [] for en in swarm_service['environment']: #print(f"Environment Variable: {en} : {swarm_service['environment'][en]}") if "=" in en: e = en.split("=",1)[0] envir.append(e) print(en) print(swarm_service['environment'][en]) swarm_service['environment'].appeendstr(swarm_service['environment'][en]).lower() #print("Deploy Section:") #print(swarm_service) # Merge user deploy section if present #input(service) if "deploy" in service: user_deploy = service["deploy"] #print("User Deploy Section:") # merge deploy.labels if "labels" in user_deploy: ##print("User Deploy Labels:") labels = user_deploy["labels"] if isinstance(labels, dict): deploy_section["labels"].update(labels) elif isinstance(labels, list): for item in labels: #print(f"Label Item: {item}") if "=" in item: k, v = item.split("=", 1) deploy_section["labels"][k] = str(v).lower() # merge placement constraints if "placement" in user_deploy: if "constraints" in user_deploy["placement"]: deploy_section["placement"]["constraints"].extend( user_deploy["placement"]["constraints"] ) # merge other keys for dk, dv in user_deploy.items(): if dk not in ["labels", "placement"]: deploy_section[dk] = copy.deepcopy(dv) swarm_service["deploy"] = deploy_section return swarm_service def convert_compose_to_swarm(app): output_file = "__swarm/" + app + "/" + app + "-swarm.yml" input_file = app + "/docker-compose.yml" with open(input_file, "r") as f: compose = yaml.safe_load(f) swarm = {"version": "3.9", "services": {}} for name, service in compose.get("services", {}).items(): swarm["services"][name] = convert_service(service) for section in ["networks", "volumes", "configs", "secrets"]: if section in compose: swarm[section] = compose[section] # Prevent PyYAML from creating anchors class NoAliasDumper(yaml.SafeDumper): def ignore_aliases(self, data): return True with open(output_file, "w") as f: yaml.dump(swarm, f, sort_keys=False, Dumper=NoAliasDumper) print(f"✔ Swarm file written to: {output_file}") if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: python convert_to_swarm.py app_name") sys.exit(1) convert_compose_to_swarm(sys.argv[1])