Files
python/scan_tasmotas.py
2023-02-17 21:18:08 +01:00

212 lines
7.7 KiB
Python

import paho.mqtt.client as mqtt
from getmac import get_mac_address
import platform
import socket
import re
import subprocess
import os
import json
import sys
import time
import json
import datetime
import ctypes
import getopt
import requests
CHECK_MARK = "\033[0;32m\xE2\x9C\x94\033[0m"
_MQTT_SETUP = _BACKUP = _APPLY = _WIFI_SETUP = _UPGRADE_FW = False
try:
opts, args = getopt.getopt(sys.argv[1:], "wh:bmu", ["command=", "help", "output=", "backup"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
#usage()
sys.exit(2)
output = None
# QJ : getopts
for o, a in opts:
if o == "-h":
_ACTION = True
elif o in ("-b", "--backup"):
_BACKUP = True
elif o in ("-m", "--mqtt"):
_MQTT_SETUP = True
elif o in ("-w", "--wifi"):
_WIFI_SETUP = True
elif o in ("-a", "--apply"):
_APPLY = True
elif o in ("-u", "--upgrade_fw"):
_UPGRADE_FW = True
else:
_WIZZARD = True
def create_backup():
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
print(cmnd)
status, output = subprocess.getstatusoutput(cmnd)
print(output)
ips = output.splitlines()
tasm_data = {}
for sys_ip in ips:
# print("Thsis is ip : " + sys_ip)
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
status, output = subprocess.getstatusoutput(cmnd)
# print("status of 80 : " + str(status))
if status == 0:
try:
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
resp = requests.get(url)
json_str = resp.content.decode('utf-8')
tasm_data["StatusNET"] = json.loads(json_str)["StatusNET"]
# print(tasm_data)
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+2"
resp = requests.get(url)
json_str = resp.content.decode('utf-8')
tasm_data["StatusFWR"] = json.loads(json_str)["StatusFWR"]
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+4"
resp = requests.get(url)
json_str = resp.content.decode('utf-8')
tasm_data["StatusMEM"] = json.loads(json_str)["StatusMEM"]
print(sys_ip + " : Hostname : " + tasm_data['StatusNET']['Hostname'] + " : Version : " + tasm_data['StatusFWR']["Version"] + " : Memory : " + str(tasm_data["StatusMEM"]["ProgramFlashSize"]))
# cmnd = "curl \"http://" + sys_ip + "/dl?user=admin&password=l4c1j4yd33Du5l0\" -o ~/tasm_back/" + tasm_data['StatusNET']['Hostname'] + ".bin"
#cmnd = "curl \"http://" + sys_ip + "/dl\" -o /media/server/nas-docker-data/_BACKUP/tasmota/" + tasm_data['StatusNET']['Hostname'] + ".bin"
# print(cmnd)
# status, output = subprocess.getstatusoutput(cmnd)
url = "http://" + sys_ip + "/dl?user=admin&password=l4c1j4yd33Du5l0"
resp = requests.get(url)
file = open("/media/nas/nas-data/_BACKUP/tasmota/" + tasm_data['StatusNET']['Hostname'] + "_" + tasm_data['StatusFWR']["Version"] + ".bin", "wb")
file.write(resp.content)
file.close()
print(CHECK_MARK)
except:
#print(sys_ip + " : Not a tasmota!")
pass
else:
#print(sys_ip + " : Not a listening")
pass
def upgrade_fw():
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
#print(cmnd)
status, output = subprocess.getstatusoutput(cmnd)
# print(output)
ips = output.splitlines()
for sys_ip in ips:
print("Thsis is ip : " + sys_ip)
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
status, output = subprocess.getstatusoutput(cmnd)
# print("status of 80 : " + str(status))
if status == 0:
try:
# url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+1"
#print(url)
resp = requests.get(url)
json_str = resp.content.decode('utf-8')
#print(json_str)
tasm_data = json.loads(json_str)
#print(tasm_data)
print(tasm_data["StatusPRM"]["OtaUrl"])
except:
#print(sys_ip + " : Not a tasmota!")
pass
else:
#print(sys_ip + " : Not a listening")
pass
def wifi_setup():
SSID1 = input(">> SID1 ? (default=jaydee) : ") or "jaydee"
PASS1 = input(">> PASS1 ? (default=) : ") or ""
SSID2 = input(">> SID2 ? (default=smart_home) : ") or "smart_home"
PASS2 = input(">> PASS2 ? (default=) : ") or ""
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
#print(cmnd)
status, output = subprocess.getstatusoutput(cmnd)
# print(output)
ips = output.splitlines()
for sys_ip in ips:
print("Thsis is ip : " + sys_ip)
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
status, output = subprocess.getstatusoutput(cmnd)
# print("status of 80 : " + str(status))
if status == 0:
try:
# url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
url = "http://" + sys_ip + "/wi?s1=" + SSID1 + "&p1=" + PASS1 + "&s2=" + SSID2 + "&p2=" + PASS2 + "&h=work_lights&save="
print(url)
if _APPLY:
requests.get(url)
else:
print("Test mode")
except:
#print(sys_ip + " : Not a tasmota!")
pass
else:
#print(sys_ip + " : Not a listening")
pass
def mqtt_setup():
MQTT_BROKER = input(">> MQTT Broker ? (default=192.168.77.106) : ") or "192.168.77.106"
MQTT_USER = input(">> MQTT User ? (default=jaydee) : ") or "jaydee"
MQTT_PASS = input(">> MQTT Password ? (default=jaydee1) : ") or "jaydee1"
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
#print(cmnd)
status, output = subprocess.getstatusoutput(cmnd)
# print(output)
ips = output.splitlines()
tasm_data = {}
for sys_ip in ips:
print("Thsis is ip : " + sys_ip)
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
status, output = subprocess.getstatusoutput(cmnd)
# print("status of 80 : " + str(status))
if status == 0:
try:
# url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
url = "http://" + sys_ip + "/cm?cmnd=Backlog%20MqttHost%20" + MQTT_BROKER + "%3BMqttUser%20" + MQTT_USER + "%3BMqttPassword%20" + MQTT_PASS
print(url)
if _APPLY:
requests.get(url)
except:
#print(sys_ip + " : Not a tasmota!")
pass
else:
#print(sys_ip + " : Not a listening")
pass
if _BACKUP:
create_backup()
if _MQTT_SETUP:
mqtt_setup()
if _WIFI_SETUP:
wifi_setup()
if _UPGRADE_FW:
upgrade_fw()