mirror of
https://gitlab.sectorq.eu/jaydee/python.git
synced 2025-12-13 18:24:53 +01:00
212 lines
7.7 KiB
Python
212 lines
7.7 KiB
Python
import paho.mqtt.client as mqtt
|
|
from getmac import get_mac_address
|
|
import platform
|
|
import socket
|
|
import re
|
|
import subprocess
|
|
import os
|
|
import json
|
|
import sys
|
|
import time
|
|
import json
|
|
import datetime
|
|
import ctypes
|
|
import getopt
|
|
import requests
|
|
CHECK_MARK = "\033[0;32m\xE2\x9C\x94\033[0m"
|
|
_MQTT_SETUP = _BACKUP = _APPLY = _WIFI_SETUP = _UPGRADE_FW = False
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:], "wh:bmu", ["command=", "help", "output=", "backup"])
|
|
except getopt.GetoptError as err:
|
|
# print help information and exit:
|
|
print(str(err)) # will print something like "option -a not recognized"
|
|
#usage()
|
|
sys.exit(2)
|
|
output = None
|
|
# QJ : getopts
|
|
|
|
for o, a in opts:
|
|
if o == "-h":
|
|
_ACTION = True
|
|
elif o in ("-b", "--backup"):
|
|
_BACKUP = True
|
|
elif o in ("-m", "--mqtt"):
|
|
_MQTT_SETUP = True
|
|
elif o in ("-w", "--wifi"):
|
|
_WIFI_SETUP = True
|
|
elif o in ("-a", "--apply"):
|
|
_APPLY = True
|
|
elif o in ("-u", "--upgrade_fw"):
|
|
_UPGRADE_FW = True
|
|
else:
|
|
_WIZZARD = True
|
|
|
|
|
|
|
|
|
|
def create_backup():
|
|
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
|
|
print(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
print(output)
|
|
ips = output.splitlines()
|
|
tasm_data = {}
|
|
for sys_ip in ips:
|
|
# print("Thsis is ip : " + sys_ip)
|
|
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
# print("status of 80 : " + str(status))
|
|
if status == 0:
|
|
try:
|
|
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
|
|
resp = requests.get(url)
|
|
json_str = resp.content.decode('utf-8')
|
|
tasm_data["StatusNET"] = json.loads(json_str)["StatusNET"]
|
|
# print(tasm_data)
|
|
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+2"
|
|
resp = requests.get(url)
|
|
json_str = resp.content.decode('utf-8')
|
|
tasm_data["StatusFWR"] = json.loads(json_str)["StatusFWR"]
|
|
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+4"
|
|
resp = requests.get(url)
|
|
json_str = resp.content.decode('utf-8')
|
|
tasm_data["StatusMEM"] = json.loads(json_str)["StatusMEM"]
|
|
print(sys_ip + " : Hostname : " + tasm_data['StatusNET']['Hostname'] + " : Version : " + tasm_data['StatusFWR']["Version"] + " : Memory : " + str(tasm_data["StatusMEM"]["ProgramFlashSize"]))
|
|
|
|
# cmnd = "curl \"http://" + sys_ip + "/dl?user=admin&password=l4c1j4yd33Du5l0\" -o ~/tasm_back/" + tasm_data['StatusNET']['Hostname'] + ".bin"
|
|
#cmnd = "curl \"http://" + sys_ip + "/dl\" -o /media/server/nas-docker-data/_BACKUP/tasmota/" + tasm_data['StatusNET']['Hostname'] + ".bin"
|
|
# print(cmnd)
|
|
# status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
url = "http://" + sys_ip + "/dl?user=admin&password=l4c1j4yd33Du5l0"
|
|
resp = requests.get(url)
|
|
file = open("/media/nas/nas-data/_BACKUP/tasmota/" + tasm_data['StatusNET']['Hostname'] + "_" + tasm_data['StatusFWR']["Version"] + ".bin", "wb")
|
|
file.write(resp.content)
|
|
file.close()
|
|
print(CHECK_MARK)
|
|
except:
|
|
#print(sys_ip + " : Not a tasmota!")
|
|
pass
|
|
else:
|
|
#print(sys_ip + " : Not a listening")
|
|
pass
|
|
|
|
def upgrade_fw():
|
|
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
|
|
#print(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
# print(output)
|
|
|
|
ips = output.splitlines()
|
|
for sys_ip in ips:
|
|
print("Thsis is ip : " + sys_ip)
|
|
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
# print("status of 80 : " + str(status))
|
|
if status == 0:
|
|
try:
|
|
# url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
|
|
|
|
url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+1"
|
|
#print(url)
|
|
resp = requests.get(url)
|
|
json_str = resp.content.decode('utf-8')
|
|
#print(json_str)
|
|
tasm_data = json.loads(json_str)
|
|
#print(tasm_data)
|
|
print(tasm_data["StatusPRM"]["OtaUrl"])
|
|
|
|
|
|
except:
|
|
#print(sys_ip + " : Not a tasmota!")
|
|
pass
|
|
else:
|
|
#print(sys_ip + " : Not a listening")
|
|
pass
|
|
|
|
|
|
def wifi_setup():
|
|
SSID1 = input(">> SID1 ? (default=jaydee) : ") or "jaydee"
|
|
PASS1 = input(">> PASS1 ? (default=) : ") or ""
|
|
SSID2 = input(">> SID2 ? (default=smart_home) : ") or "smart_home"
|
|
PASS2 = input(">> PASS2 ? (default=) : ") or ""
|
|
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
|
|
#print(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
# print(output)
|
|
|
|
ips = output.splitlines()
|
|
for sys_ip in ips:
|
|
|
|
print("Thsis is ip : " + sys_ip)
|
|
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
# print("status of 80 : " + str(status))
|
|
if status == 0:
|
|
try:
|
|
# url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
|
|
url = "http://" + sys_ip + "/wi?s1=" + SSID1 + "&p1=" + PASS1 + "&s2=" + SSID2 + "&p2=" + PASS2 + "&h=work_lights&save="
|
|
print(url)
|
|
if _APPLY:
|
|
requests.get(url)
|
|
else:
|
|
print("Test mode")
|
|
|
|
except:
|
|
#print(sys_ip + " : Not a tasmota!")
|
|
pass
|
|
else:
|
|
#print(sys_ip + " : Not a listening")
|
|
pass
|
|
|
|
def mqtt_setup():
|
|
MQTT_BROKER = input(">> MQTT Broker ? (default=192.168.77.106) : ") or "192.168.77.106"
|
|
MQTT_USER = input(">> MQTT User ? (default=jaydee) : ") or "jaydee"
|
|
MQTT_PASS = input(">> MQTT Password ? (default=jaydee1) : ") or "jaydee1"
|
|
|
|
cmnd = "nmap -sP 192.168.77.*|grep \"Nmap scan report\"|egrep -o \"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\""
|
|
#print(cmnd)
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
|
|
# print(output)
|
|
|
|
ips = output.splitlines()
|
|
tasm_data = {}
|
|
for sys_ip in ips:
|
|
print("Thsis is ip : " + sys_ip)
|
|
cmnd = "nmap " + sys_ip + " -p80|grep \"80/tcp open http\""
|
|
status, output = subprocess.getstatusoutput(cmnd)
|
|
# print("status of 80 : " + str(status))
|
|
if status == 0:
|
|
try:
|
|
# url = "http://" + sys_ip + "/cm?user=admin&password=l4c1j4yd33Du5l0&cmnd=STATUS+5"
|
|
|
|
url = "http://" + sys_ip + "/cm?cmnd=Backlog%20MqttHost%20" + MQTT_BROKER + "%3BMqttUser%20" + MQTT_USER + "%3BMqttPassword%20" + MQTT_PASS
|
|
print(url)
|
|
if _APPLY:
|
|
requests.get(url)
|
|
|
|
except:
|
|
#print(sys_ip + " : Not a tasmota!")
|
|
pass
|
|
else:
|
|
#print(sys_ip + " : Not a listening")
|
|
pass
|
|
|
|
|
|
if _BACKUP:
|
|
create_backup()
|
|
|
|
|
|
if _MQTT_SETUP:
|
|
mqtt_setup()
|
|
|
|
if _WIFI_SETUP:
|
|
wifi_setup()
|
|
|
|
|
|
if _UPGRADE_FW:
|
|
upgrade_fw() |