mirror of
https://gitlab.sectorq.eu/jaydee/python.git
synced 2025-12-14 02:34:53 +01:00
130 lines
3.5 KiB
Python
130 lines
3.5 KiB
Python
#!/usr/bin/env python
|
|
from apa102_pi.driver import apa102
|
|
from pixel_ring import pixel_ring
|
|
from gpiozero import LED
|
|
from time import sleep
|
|
import paho.mqtt.client as mqtt
|
|
import RPi.GPIO as GPIO
|
|
import json
|
|
import os
|
|
import time
|
|
|
|
rhasspyConfig = '/home/jd/.config/rhasspy/profiles/en/profile.json'
|
|
|
|
counter = 0
|
|
LED_ST = "on"
|
|
mute = "off"
|
|
siteId = ""
|
|
MQTThost = ""
|
|
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(5, GPIO.OUT)
|
|
GPIO.output(5, GPIO.HIGH)
|
|
|
|
power = LED(5)
|
|
power.on()
|
|
pixel_ring.set_brightness(20)
|
|
pixel_ring.change_pattern('echo')
|
|
pixel_ring.wakeup()
|
|
time.sleep(2)
|
|
|
|
with open(rhasspyConfig,'r', encoding='UTF-8') as file:
|
|
obj = json.loads(file.read())
|
|
MQTTconfig = json.dumps(obj["mqtt"])
|
|
MQTTconfig = MQTTconfig.replace("\"mqtt\": ","")
|
|
MQTTconfig = json.loads(MQTTconfig)
|
|
siteId = MQTTconfig["site_id"]
|
|
MQTThost = MQTTconfig["host"]
|
|
MQTThost = MQTThost.strip('"')
|
|
if "port" in json.dumps(MQTTconfig):
|
|
MQTTport = MQTTconfig["port"]
|
|
MQTTport = MQTTport.strip('"')
|
|
MQTTport = int(MQTTport)
|
|
else:
|
|
MQTTport = 1883
|
|
|
|
strip = apa102.APA102(num_led=12)
|
|
strip.clear_strip()
|
|
|
|
def on_disconnect():
|
|
# for i in range(0,12):
|
|
# strip.set_pixel(i,0,59,0,7)
|
|
# strip.show()
|
|
pixel_ring.wakeup()
|
|
|
|
for i in range(0,12):
|
|
strip.set_pixel(i,255,0,0,30)
|
|
strip.show()
|
|
time.sleep(3)
|
|
def on_connect(client, userdata, flags, rc):
|
|
print("Connected with result code " + str(rc))
|
|
client.subscribe("hermes/dialogueManager/sessionEnded/#")
|
|
client.subscribe("hermes/hotword/toggleOff/#")
|
|
client.subscribe("hermes/hotword/toggleOn/#")
|
|
client.subscribe("hermes/asr/stopListening")
|
|
client.subscribe("hermes/hotword/hey siri_raspberry-pi/detected")
|
|
client.subscribe("hermes/asr/textCaptured")
|
|
|
|
for i in range(0,12):
|
|
strip.set_pixel(i,0,127,0,7)
|
|
strip.show()
|
|
sleep(2)
|
|
for i in range(0,12):
|
|
strip.set_pixel(i,0,255,0,0)
|
|
strip.show()
|
|
def on_message(client, userdata, msg):
|
|
jsonData = json.loads(msg.payload)
|
|
if msg.topic == "hermes/hotword/hey siri_raspberry-pi/detected" and jsonData["siteId"] == siteId and LED_ST == "on":
|
|
# for i in range(0,12):
|
|
# strip.set_pixel(i,0,100,0,7)
|
|
# strip.show()
|
|
pixel_ring.think()
|
|
|
|
elif msg.topic == "hermes/asr/textCaptured" and jsonData["siteId"] == siteId:
|
|
# for i in range(0,12):
|
|
# strip.set_pixel(i,0,255,0,0)
|
|
# strip.show()
|
|
pixel_ring.wakeup()
|
|
time.sleep(0.3)
|
|
pixel_ring.off()
|
|
time.sleep(0.3)
|
|
pixel_ring.wakeup()
|
|
time.sleep(0.3)
|
|
pixel_ring.off()
|
|
elif msg.topic == "hermes/asr/stopListening":
|
|
#pixel_ring.off()
|
|
pass
|
|
elif msg.topic == "hermes/hotword/toggleOn":
|
|
# for i in range(0,12):
|
|
# strip.set_pixel(i,0,99,0,0)
|
|
# strip.show()
|
|
#pixel_ring.off() for i in range(0,12):
|
|
# strip.set_pixel(i,255,0,0,7)
|
|
# strip.show()
|
|
# time.sleep(0.2)
|
|
# pixel_ring.off()
|
|
# time.sleep(0.3)
|
|
# for i in range(0,12):
|
|
# strip.set_pixel(i,255,0,0,7)
|
|
# strip.show()
|
|
# time.sleep(0.2)
|
|
pass
|
|
else:
|
|
# for i in range(0,12):
|
|
# strip.set_pixel(i,0,99,0,0)
|
|
# strip.show()
|
|
pass
|
|
# for i in range(0,12):
|
|
# strip.set_pixel(i,0,127,255,7)
|
|
# sleep(0.1)
|
|
# strip.show()
|
|
|
|
client = mqtt.Client()
|
|
client.username_pw_set("jaydee", password="jaydee1")
|
|
client.on_connect = on_connect
|
|
client.on_disconnect = on_disconnect
|
|
client.on_message = on_message
|
|
client.connect(MQTThost, MQTTport, 15)
|
|
|
|
client.loop_forever()
|