Compare commits

..

6 Commits

Author SHA1 Message Date
jaydee 8f57304112 klal
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled
2026-03-30 12:12:59 +02:00
jaydee 6d496a04af build
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled
2026-03-29 21:44:57 +02:00
jaydee 99d4075edd klal
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled
2026-03-27 12:24:26 +01:00
ladislav.dusa 8fb84b3c47 build
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled
2026-03-25 14:43:19 +01:00
jaydee e3cd8f5843 klal
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled
2026-03-24 22:18:34 +01:00
jaydee 68775c50da build
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled
2026-03-19 21:25:33 +01:00
25 changed files with 1196 additions and 37 deletions
+6 -1
View File
@@ -1,6 +1,8 @@
- hosts: datacenter
name: Roles
gather_facts: false
vars_files:
- jaydee.yml # vault file
roles:
- name: zabbix_proxy_service
role: zabbix_proxy_service
@@ -106,4 +108,7 @@
tags: terraform
- name: setup_repo_sync
role: setup_repo_sync
tags: setup_repo_sync
tags: setup_repo_sync
- name: puppet-agent
role: puppet-agent
tags: puppet-agent
+4 -6
View File
@@ -26,14 +26,11 @@ datacenter:
ansible_user: root
ryzen:
hosts:
192.168.77.15:
ryzen.home.lan:
vars:
ansible_python_interpreter: auto_silent
ansible_ssh_user: root
ansible_ssh_pass: lacijaydee
ansible_password: lacijaydee
ansible_become_user: root
ansible_become_password: lacijaydee
ansible_ssh_user: jd
ansible_ssh_private_key_file: ssh_key.pem
omv:
hosts:
192.168.77.189:
@@ -177,6 +174,7 @@ datacenter:
desktops:
hosts:
morefine.home.lan:
asus.home.lan:
vars:
ansible_python_interpreter: /usr/bin/python3
ansible_ssh_user: jd
+26 -25
View File
@@ -1,26 +1,27 @@
$ANSIBLE_VAULT;1.1;AES256
38393362333061613238333964643062316633323761346333646464373437353630656635323561
6433353166343337346433643839363866383831653666370a366231343233626366643466353762
39346263313962343730333364643461376436663061323633623537346238316363343735343136
6631653665653363660a663136613961396163636163636465333163363537663634386261393537
30333233316236343161663136313635353162633930646431613335313264303565616435636662
61343363386430303438333934393362643661356636316432396266623762393833333539633166
64623230353861356530653263306637323964336231353665346264333463663030366338616330
62373461363333643838613034636537353233393137656135386131656130363462653633366336
66323236346234616639383066643638303937623938333136623131396334623263653763656131
34306366343738663930666536663664613732343566323430353638393662666130393366663861
35346532656531326366343963396334393432343931656332663933616131643161363933383738
63616164316637626639653835356433613764663361393933346239343866323164643765336636
64373764373964613864386133623835303163663933396534303132323630326236336132393063
63666264613331306165326232386166633039333030346338383632363739393566383863663761
64656332643838633037373030336265663738343563303262386437353066313466393764616231
35653063623263666439393635613037323439616630363837353930626632313437303762633433
37646664376532343632613162333039326463646231383939373965623434306665383662633861
31383335393236313362323762633833366461643264633730636133646335393532376133386432
66666534393431333235326139333936616365356434636638346433663566666633306237313264
32313031663166353465653863643762313163386137306238383436343631303630643266353035
35306636643735396630613538326461656132623931666234666232396238313062343265396666
62343766616338383962386165393930396536666663643963383237663236666664303837613135
35666234363462616562366333396366353838353763613639653936303039613863323565306263
35373266303035613638653338346434616634353534393838646131366432633266353636383433
6638
66616666346233663164303662323266303431383638623139373163636566636230313862646664
3866616633376432666530663963346561613538633563610a653835323733353465306562646631
33653763636238323363653439316634623134366266363032353561343136353037323635656661
3436333637616362360a316636386463656466653964623166323639323834386535376532343335
62303631636135363630343833613963313564386636636561393039326361356232333162333336
30306266623030623632366135313335386466393563343030356635646563306434336137396439
39666531336233646638643730386137323233653061373330373430666639393230373566626534
36643335356566396239396265663738373838643234333438356630633031323635663666666530
37363163356136306366333036656162643630613039623730363365636133373264313832623066
31393233613665643261366337643332363032303461636138313635333130626139666666376439
38373437326164616434363662613535323362363561313962396665356538336333373766626361
37626161663238356162326461613634363462623534336632306331356239363562396662396130
63626337383232366335643561613766313038643464323836643130626533633339623633326233
34393235376565616463386539366638336633343430363639393666633932316266653466393465
35303832636562376665303961656134363463376637643438353038343631303164326130323966
39346563663062623937633639303739393665343838666137316132633631376230633763323535
31343965363965303561356638346130316139303832653863623762393664616335646364643437
37336436643362393935343037633730666362636338343466646632646266303930656436313263
36613464313735303566613762396261373737316563646266646237386139633663333564346136
33353761363663613865623863666132383730396631393563643434346437326365373238666639
65626339313264666439653430306331363266383232383132653562666165383436346464663265
31643830656136636533363265393362393934343531303362653232646232643563313738613939
31323365333461636637356265393665646235643530376331303931623138616637386362326132
32656334643864626136353934363433316463303637336363653934623834326337663334346262
34376539353230303639633235383635613739343633613361356665373662383866343439383465
3137313662333665303064646362616639333031633239626462
+1 -1
View File
@@ -3,7 +3,7 @@ mkdir /tmp/certgen 2>/dev/null
cd /tmp/certgen
openssl genrsa -aes256 -out ca-key.pem -passout pass:"foobarpwd" 4096
openssl req -new -x509 -days 365 -key ca-key.pem -sha256 -out ca.pem -subj "/C=SK/ST=Slovakia/L=SK/O=sectorq.cloud/OU=IT/CN=sectorq.cloud" -passin pass:"foobarpwd"
openssl req -new -x509 -days 365 -key ca-key.pem -sha256 -out ca.pem -subj "/C=SK/ST=Slovakia/L=SK/O=sectorq.eu/OU=IT/CN=sectorq.eu" -passin pass:"foobarpwd"
openssl genrsa -out server-key.pem 4096
if [[ `hostname` == "nas" ]]
then
+1 -1
View File
@@ -31,6 +31,6 @@
- name: Upgrade flatpack
ansible.builtin.command: flatpak update -y
when: inventory_hostname == 'morefine.home.lan'
when: inventory_hostname in ['morefine.home.lan','ryzen.home.lan','asus.home.lan']
register: logo
changed_when: "logo.rc == 0"
+7
View File
@@ -0,0 +1,7 @@
- name: Upgrade
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
block:
- name: Install the dbeaver flatpak
community.general.flatpak:
name: app/io.dbeaver.DBeaverCommunity/x86_64/stable
state: present
+4
View File
@@ -0,0 +1,4 @@
username: "jd"
user_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
new_root_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
user_groups: "sudo"
+1
View File
@@ -0,0 +1 @@
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCjPi74gAHT2GBbKWWmmUrzzrhKQN3mnz3DTWn02KhbgWs6MRLlTrn2fLqB4hkoVEOat29wAOJ2xuY4aEjGfO6nl/3ka3oqslwxW9GqFCRXVHnF3b+Wly+jkn3tbht+gjFK4x0np1WZbpTI/nR7gElWa774LCHy5i4fm5ISoCHzRMsKd3dVF4YOte6QC9Uu4uQQjpZWksYRjANtXAk5EDEOERz+Dh8RDkXbGYW2vx1ihNQGh53z3c6W6bajGq3qx/+G905MtRgutbHWOOYSE8o2q3vaDkCNA6Fjd3P6gHTPcRjFEfoHc8Vxp0vjR9e6ol8X7D+GEL6g6QN656772Zo3/Qka0rJ/zMnT3gZ2S4i0CzmqDoecbzDCX7ywwuwgvlmJGOswgwwOKP/E0/RnWi63BUFV/fU9o5nr9ZXgkiRa9ZCzubHZhxyN6Z7EhcOVnMbvfV1W5Fn/vbj84YudCLmzrk5qvpQ+qVsMXi1GPIEyFwyg/Afu5JTpNE5+4ViFzTM= jd@morefine
+4
View File
@@ -0,0 +1,4 @@
username: "jd"
user_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
new_root_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
user_groups: "sudo"
+38
View File
@@ -0,0 +1,38 @@
Role Name
=========
A brief description of the role goes here.
Requirements
------------
Any pre-requisites that may not be covered by Ansible itself or the role should be mentioned here. For instance, if the role uses the EC2 module, it may be a good idea to mention in this section that the boto package is required.
Role Variables
--------------
A description of the settable variables for this role should go here, including any variables that are in defaults/main.yml, vars/main.yml, and any variables that can/should be set via parameters to the role. Any variables that are read from other roles and/or the global scope (ie. hostvars, group vars, etc.) should be mentioned here as well.
Dependencies
------------
A list of other roles hosted on Galaxy should go here, plus any details in regards to parameters that may need to be set for other roles, or variables that are used from other roles.
Example Playbook
----------------
Including an example of how to use your role (for instance, with variables passed in as parameters) is always nice for users too:
- hosts: servers
roles:
- { role: username.rolename, x: 42 }
License
-------
BSD
Author Information
------------------
An optional section for the role authors to include contact information, or a website (HTML is not allowed).
+2
View File
@@ -0,0 +1,2 @@
---
# defaults file for puppet-agent
+2
View File
@@ -0,0 +1,2 @@
---
# handlers file for puppet-agent
+52
View File
@@ -0,0 +1,52 @@
galaxy_info:
author: your name
description: your role description
company: your company (optional)
# If the issue tracker for your role is not on github, uncomment the
# next line and provide a value
# issue_tracker_url: http://example.com/issue/tracker
# Choose a valid license ID from https://spdx.org - some suggested licenses:
# - BSD-3-Clause (default)
# - MIT
# - GPL-2.0-or-later
# - GPL-3.0-only
# - Apache-2.0
# - CC-BY-4.0
license: license (GPL-2.0-or-later, MIT, etc)
min_ansible_version: 2.1
# If this a Container Enabled role, provide the minimum Ansible Container version.
# min_ansible_container_version:
#
# Provide a list of supported platforms, and for each platform a list of versions.
# If you don't wish to enumerate all versions for a particular platform, use 'all'.
# To view available platforms and versions (or releases), visit:
# https://galaxy.ansible.com/api/v1/platforms/
#
# platforms:
# - name: Fedora
# versions:
# - all
# - 25
# - name: SomePlatform
# versions:
# - all
# - 1.0
# - 7
# - 99.99
galaxy_tags: []
# List tags for your role here, one per line. A tag is a keyword that describes
# and categorizes the role. Users find roles by searching for tags. Be sure to
# remove the '[]' above, if you add tags to this list.
#
# NOTE: A tag is limited to a single word comprised of alphanumeric characters.
# Maximum 20 tags per role.
dependencies: []
# List your role dependencies here, one per line. Be sure to remove the '[]' above,
# if you add dependencies to this list.
+46
View File
@@ -0,0 +1,46 @@
- name: Install puppet agent
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
vars:
puppet_server: active.home.lan
block:
# - name: Ensure Puppet repo is present (Debian/Ubuntu)
# apt_repository:
# repo: "deb http://apt.puppetlabs.com {{ ansible_distribution_release }} puppet6"
# state: present
# when: ansible_os_family == "Debian"
- name: Facts
ansible.builtin.setup:
when: ansible_facts.architecture is not defined
- name: Install Puppet agent package
ansible.builtin.package:
name: puppet-agent
state: present
- name: Create Puppet configuration directory
ansible.builtin.file:
path: /etc/puppetlabs/puppet
state: directory
mode: '0755'
- name: Deploy puppet.conf
ansible.builtin.template:
src: puppet.conf.j2
dest: /etc/puppet/puppet.conf
owner: root
group: root
mode: '0644'
- name: Enable and start puppet agent
ansible.builtin.systemd:
name: puppet
enabled: true
state: started
- name: Trigger puppet agent run once
ansible.builtin.command: /usr/bin/puppet agent -t
register: puppet_run
changed_when: puppet_run.rc != 0
- name: Debug puppet run output
ansible.builtin.debug:
var: puppet_run.stdout_lines
@@ -0,0 +1,5 @@
[main]
server = {{ puppet_server }}
certname = {{ inventory_hostname }}
environment = production
runinterval = 10m
+2
View File
@@ -0,0 +1,2 @@
localhost
+5
View File
@@ -0,0 +1,5 @@
---
- hosts: localhost
remote_user: root
roles:
- puppet-agent
+3
View File
@@ -0,0 +1,3 @@
---
# vars file for puppet-agent
puppet_server: active.home.lan
+40
View File
@@ -0,0 +1,40 @@
- name: Set banner
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
block:
- name: Install packages
ansible.builtin.apt:
name:
- figlet
- toilet
- name: Create Banner
ansible.builtin.command: |
figlet -c {{ (inventory_hostname | split('.'))[0] }} -f slant
register: logo
changed_when: "logo.rc == 0"
- name: Creating a file with content
ansible.builtin.copy:
dest: "/etc/motd"
content: |
{{ logo.stdout }}
owner: 0
group: 0
mode: "0777"
- name: Reconfigure sshd
ansible.builtin.lineinfile:
path: /etc/ssh/sshd_config
regexp: "^Banner.* "
line: "#Banner /etc/banner"
- name: Reconfigure sshd
ansible.builtin.lineinfile:
path: /etc/ssh/sshd_config
regexp: "^#PrintLastLog.* "
line: "PrintLastLog no"
- name: Sshd
ansible.builtin.service:
name: ssh.service
state: restarted
+3
View File
@@ -1,6 +1,9 @@
- name: SSH config Setup
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
block:
- name: Include vault
ansible.builtin.include_vars:
file: jaydee.yml
- name: Upload config
ansible.builtin.copy:
src: config
@@ -0,0 +1 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJSABeQ3x7nM3HBil1LpGYLrdz5NbXVl6l6dNMr0lnwZ jd@ryzen
+3 -3
View File
@@ -18,8 +18,8 @@
when: inventory_hostname != 'nas.home.lan'
- name: Upload key
ansible.builtin.copy:
src: id_rsa.pub
dest: /home/jd/.ssh/id_rsa.pub
src: id_ed25519_homelab.pub
dest: /home/jd/.ssh/id_ed25519_homelab.pub
mode: '0600'
owner: jd
group: jd
@@ -28,4 +28,4 @@
ansible.posix.authorized_key:
user: jd
state: present
key: "{{ lookup('file', '/home/jd/.ssh/id_rsa.pub') }}"
key: "{{ lookup('file', '/home/jd/.ssh/id_ed25519_homelab.pub') }}"
Binary file not shown.
+5
View File
@@ -0,0 +1,5 @@
requests
pycryptodome
charset-normalizer
Pillow
colorama
+935
View File
@@ -0,0 +1,935 @@
from abc import ABC, abstractmethod
import argparse
import base64
import hashlib
import hmac
import json
import logging
import os
import random
import re
import socket
import sys
import tempfile
import threading
import time
from getpass import getpass
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, urlparse
import requests
from colorama import Fore, Style, init
try:
from Crypto.Cipher import ARC4
except ModuleNotFoundError:
from Cryptodome.Cipher import ARC4
from PIL import Image
if sys.platform != "win32":
import readline
SERVERS = ["cn", "de", "us", "ru", "tw", "sg", "in", "i2"]
NAME_TO_LEVEL = {
"CRITICAL": logging.CRITICAL,
"FATAL": logging.FATAL,
"ERROR": logging.ERROR,
"WARN": logging.WARNING,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"NOTSET": logging.NOTSET,
}
parser = argparse.ArgumentParser()
parser.add_argument("-ni", "--non_interactive", required=False, help="Non-interactive mode", action="store_true")
parser.add_argument("-u", "--username", required=False, help="Username")
parser.add_argument("-p", "--password", required=False, help="Password")
parser.add_argument("-s", "--server", required=False, help="Server", choices=[*SERVERS, ""])
parser.add_argument("-l", "--log_level", required=False, help="Log level", default="CRITICAL", choices=list(NAME_TO_LEVEL.keys()))
parser.add_argument("-o", "--output", required=False, help="Output file")
parser.add_argument("--host", required=False, help="Host")
args = parser.parse_args()
if args.non_interactive and (not args.username or not args.password):
parser.error("You need to specify username and password or run as interactive.")
init(autoreset=True)
class ColorFormatter(logging.Formatter):
COLORS = {
"CRITICAL": Fore.RED + Style.BRIGHT,
"FATAL": Fore.RED + Style.BRIGHT,
"ERROR": Fore.RED,
"WARN": Fore.YELLOW,
"WARNING": Fore.YELLOW,
"INFO": Fore.GREEN,
"DEBUG": Fore.BLUE,
}
def format(self, record: logging.LogRecord) -> str:
color = self.COLORS.get(record.levelname, "")
return color + logging.Formatter.format(self, record)
class ColorLogger(logging.Logger):
def __init__(self, name: str) -> None:
level = NAME_TO_LEVEL[args.log_level.upper()]
logging.Logger.__init__(self, name, level)
color_formatter = ColorFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(color_formatter)
self.addHandler(handler)
logging.setLoggerClass(ColorLogger)
_LOGGER = logging.getLogger("token_extractor")
class XiaomiCloudConnector(ABC):
def __init__(self):
self._agent = self.generate_agent()
self._device_id = self.generate_device_id()
self._session = requests.session()
self._ssecurity = None
self.userId = None
self._serviceToken = None
@abstractmethod
def login(self) -> bool:
pass
def get_homes(self, country):
url = self.get_api_url(country) + "/v2/homeroom/gethome"
params = {
"data": '{"fg": true, "fetch_share": true, "fetch_share_dev": true, "limit": 300, "app_ver": 7}'}
return self.execute_api_call_encrypted(url, params)
def get_devices(self, country, home_id, owner_id):
url = self.get_api_url(country) + "/v2/home/home_device_list"
params = {
"data": '{"home_owner": ' + str(owner_id) +
',"home_id": ' + str(home_id) +
', "limit": 200, "get_split_device": true, "support_smart_home": true}'
}
return self.execute_api_call_encrypted(url, params)
def get_dev_cnt(self, country):
url = self.get_api_url(country) + "/v2/user/get_device_cnt"
params = {
"data": '{ "fetch_own": true, "fetch_share": true}'
}
return self.execute_api_call_encrypted(url, params)
def get_beaconkey(self, country, did):
url = self.get_api_url(country) + "/v2/device/blt_get_beaconkey"
params = {
"data": '{"did":"' + did + '","pdid":1}'
}
return self.execute_api_call_encrypted(url, params)
def execute_api_call_encrypted(self, url, params):
headers = {
"Accept-Encoding": "identity",
"User-Agent": self._agent,
"Content-Type": "application/x-www-form-urlencoded",
"x-xiaomi-protocal-flag-cli": "PROTOCAL-HTTP2",
"MIOT-ENCRYPT-ALGORITHM": "ENCRYPT-RC4",
}
cookies = {
"userId": str(self.userId),
"yetAnotherServiceToken": str(self._serviceToken),
"serviceToken": str(self._serviceToken),
"locale": "en_GB",
"timezone": "GMT+02:00",
"is_daylight": "1",
"dst_offset": "3600000",
"channel": "MI_APP_STORE"
}
millis = round(time.time() * 1000)
nonce = self.generate_nonce(millis)
signed_nonce = self.signed_nonce(nonce)
fields = self.generate_enc_params(url, "POST", signed_nonce, nonce, params, self._ssecurity)
response = self._session.post(url, headers=headers, cookies=cookies, params=fields)
if response.status_code == 200:
decoded = self.decrypt_rc4(self.signed_nonce(fields["_nonce"]), response.text)
return json.loads(decoded)
return None
@staticmethod
def get_api_url(country):
return "https://" + ("" if country == "cn" else (country + ".")) + "api.io.mi.com/app"
def signed_nonce(self, nonce):
hash_object = hashlib.sha256(base64.b64decode(self._ssecurity) + base64.b64decode(nonce))
return base64.b64encode(hash_object.digest()).decode("utf-8")
@staticmethod
def signed_nonce_sec(nonce, ssecurity):
hash_object = hashlib.sha256(base64.b64decode(ssecurity) + base64.b64decode(nonce))
return base64.b64encode(hash_object.digest()).decode("utf-8")
@staticmethod
def generate_nonce(millis):
nonce_bytes = os.urandom(8) + (int(millis / 60000)).to_bytes(4, byteorder="big")
return base64.b64encode(nonce_bytes).decode()
@staticmethod
def generate_agent():
agent_id = "".join(
map(lambda i: chr(i), [random.randint(65, 69) for _ in range(13)])
)
random_text = "".join(map(lambda i: chr(i), [random.randint(97, 122) for _ in range(18)]))
return f"{random_text}-{agent_id} APP/com.xiaomi.mihome APPV/10.5.201"
@staticmethod
def generate_device_id():
return "".join(map(lambda i: chr(i), [random.randint(97, 122) for _ in range(6)]))
@staticmethod
def generate_signature(url, signed_nonce, nonce, params):
signature_params = [url.split("com")[1], signed_nonce, nonce]
for k, v in params.items():
signature_params.append(f"{k}={v}")
signature_string = "&".join(signature_params)
signature = hmac.new(base64.b64decode(signed_nonce), msg=signature_string.encode(), digestmod=hashlib.sha256)
return base64.b64encode(signature.digest()).decode()
@staticmethod
def generate_enc_signature(url, method, signed_nonce, params):
signature_params = [str(method).upper(), url.split("com")[1].replace("/app/", "/")]
for k, v in params.items():
signature_params.append(f"{k}={v}")
signature_params.append(signed_nonce)
signature_string = "&".join(signature_params)
return base64.b64encode(hashlib.sha1(signature_string.encode("utf-8")).digest()).decode()
@staticmethod
def generate_enc_params(url, method, signed_nonce, nonce, params, ssecurity):
params["rc4_hash__"] = XiaomiCloudConnector.generate_enc_signature(url, method, signed_nonce, params)
for k, v in params.items():
params[k] = XiaomiCloudConnector.encrypt_rc4(signed_nonce, v)
params.update({
"signature": XiaomiCloudConnector.generate_enc_signature(url, method, signed_nonce, params),
"ssecurity": ssecurity,
"_nonce": nonce,
})
return params
@staticmethod
def to_json(response_text):
return json.loads(response_text.replace("&&&START&&&", ""))
@staticmethod
def encrypt_rc4(password, payload):
r = ARC4.new(base64.b64decode(password))
r.encrypt(bytes(1024))
return base64.b64encode(r.encrypt(payload.encode())).decode()
@staticmethod
def decrypt_rc4(password, payload):
r = ARC4.new(base64.b64decode(password))
r.encrypt(bytes(1024))
return r.encrypt(base64.b64decode(payload))
class PasswordXiaomiCloudConnector(XiaomiCloudConnector):
def __init__(self):
super().__init__()
self._sign = None
self._cUserId = None
self._passToken = None
self._location = None
self._code = None
def login(self) -> bool:
if args.username:
self._username = args.username
else:
print_if_interactive(f"Username {Fore.BLUE}(email, phone number or user ID){Style.RESET_ALL}:")
self._username = input()
if args.password:
self._password = args.password
else:
print_if_interactive(f"Password {Fore.BLUE}(not displayed for privacy reasons){Style.RESET_ALL}:")
self._password = getpass("")
print_if_interactive()
print_if_interactive(f"{Fore.BLUE}Logging in...")
print_if_interactive()
self._session.cookies.set("sdkVersion", "accountsdk-18.8.15", domain="mi.com")
self._session.cookies.set("sdkVersion", "accountsdk-18.8.15", domain="xiaomi.com")
self._session.cookies.set("deviceId", self._device_id, domain="mi.com")
self._session.cookies.set("deviceId", self._device_id, domain="xiaomi.com")
if not self.login_step_1():
print_if_interactive(f"{Fore.RED}Invalid username.")
return False
if not self.login_step_2():
print_if_interactive(f"{Fore.RED}Invalid login or password.")
return False
if self._location and not self._serviceToken and not self.login_step_3():
print_if_interactive(f"{Fore.RED}Unable to get service token.")
return False
return True
def login_step_1(self):
_LOGGER.debug("login_step_1")
url = "https://account.xiaomi.com/pass/serviceLogin?sid=xiaomiio&_json=true"
headers = {
"User-Agent": self._agent,
"Content-Type": "application/x-www-form-urlencoded"
}
cookies = {
"userId": self._username
}
response = self._session.get(url, headers=headers, cookies=cookies)
_LOGGER.debug(response.text)
json_resp = self.to_json(response.text)
if response.status_code == 200:
if "_sign" in json_resp:
self._sign = json_resp["_sign"]
return True
elif "ssecurity" in json_resp:
self._ssecurity = json_resp["ssecurity"]
self.userId = json_resp["userId"]
self._cUserId = json_resp["cUserId"]
self._passToken = json_resp["passToken"]
self._location = json_resp["location"]
self._code = json_resp["code"]
return True
return False
def login_step_2(self) -> bool:
_LOGGER.debug("login_step_2")
url: str = "https://account.xiaomi.com/pass/serviceLoginAuth2"
headers: dict = {
"User-Agent": self._agent,
"Content-Type": "application/x-www-form-urlencoded"
}
fields: dict = {
"sid": "xiaomiio",
"hash": hashlib.md5(str.encode(self._password)).hexdigest().upper(),
"callback": "https://sts.api.io.mi.com/sts",
"qs": "%3Fsid%3Dxiaomiio%26_json%3Dtrue",
"user": self._username,
"_sign": self._sign,
"_json": "true"
}
_LOGGER.debug("login_step_2: URL: %s", url)
_LOGGER.debug("login_step_2: Fields: %s", fields)
response = self._session.post(url, headers=headers, params=fields, allow_redirects=False)
_LOGGER.debug("login_step_2: Response text: %s", response.text)
valid: bool = response is not None and response.status_code == 200
if valid:
json_resp: dict = self.to_json(response.text)
if "captchaUrl" in json_resp and json_resp["captchaUrl"] is not None:
if args.non_interactive:
parser.error("Captcha solution required, rerun in interactive mode")
captcha_code: str = self.handle_captcha(json_resp["captchaUrl"])
if not captcha_code:
_LOGGER.debug("Could not solve captcha.")
return False
# Add captcha code to the fields and retry
fields["captCode"] = captcha_code
_LOGGER.debug("Retrying login with captcha.")
response = self._session.post(url, headers=headers, params=fields, allow_redirects=False)
_LOGGER.debug("login_step_2: Retry Response text: %s", response.text[:1000])
if response is not None and response.status_code == 200:
json_resp = self.to_json(response.text)
else:
_LOGGER.error("Login failed even after captcha.")
return False
if "code" in json_resp and json_resp["code"] == 87001:
print_if_interactive("Invalid captcha.")
return False
valid = "ssecurity" in json_resp and len(str(json_resp["ssecurity"])) > 4
if valid:
self._ssecurity = json_resp["ssecurity"]
self.userId = json_resp.get("userId", None)
self._cUserId = json_resp.get("cUserId", None)
self._passToken = json_resp.get("passToken", None)
self._location = json_resp.get("location", None)
self._code = json_resp.get("code", None)
else:
if "notificationUrl" in json_resp:
if args.non_interactive:
parser.error("2FA solution required, rerun in interactive mode")
verify_url = json_resp["notificationUrl"]
return self.do_2fa_email_flow(verify_url)
else:
_LOGGER.error("login_step_2: Login failed, server returned: %s", json_resp)
else:
_LOGGER.error("login_step_2: HTTP status: %s; Response: %s", response.status_code, response.text[:500])
return valid
def login_step_3(self):
_LOGGER.debug("login_step_3")
headers = {
"User-Agent": self._agent,
"Content-Type": "application/x-www-form-urlencoded"
}
response = self._session.get(self._location, headers=headers)
_LOGGER.debug(response.text)
if response.status_code == 200:
self._serviceToken = response.cookies.get("serviceToken")
return response.status_code == 200
def handle_captcha(self, captcha_url: str) -> str:
# Full URL in case it's relative
if captcha_url.startswith("/"):
captcha_url = "https://account.xiaomi.com" + captcha_url
_LOGGER.debug("Downloading captcha image from: %s", captcha_url)
response = self._session.get(captcha_url, stream=False)
if response.status_code != 200:
_LOGGER.error("Unable to fetch captcha image.")
return ""
print_if_interactive(f"{Fore.YELLOW}Captcha verification required.")
present_image_image(
response.content,
message_url = f"Image URL: {Fore.BLUE}http://{args.host or '127.0.0.1'}:31415",
message_file_saved = "Captcha image saved at: {}",
message_manually_open_file = "Please open {} and solve the captcha."
)
# Ask user for a captcha solution
print_if_interactive(f"Enter captcha as shown in the image {Fore.BLUE}(case-sensitive){Style.RESET_ALL}:")
captcha_solution: str = input().strip()
print_if_interactive()
return captcha_solution
def do_2fa_email_flow(self, notification_url: str) -> bool:
"""
Handles the email-based 2FA flow and extracts ssecurity + serviceToken.
Robust to cases where verifyEmail returns non-JSON/empty body.
"""
# 1) Open notificationUrl (authStart)
headers = {
"User-Agent": self._agent,
"Content-Type": "application/x-www-form-urlencoded"
}
_LOGGER.debug("Opening notificationUrl (authStart): %s", notification_url)
r = self._session.get(notification_url, headers=headers)
_LOGGER.debug("authStart final URL: %s status=%s", r.url, r.status_code)
# 2) Fetch identity options (list)
context = parse_qs(urlparse(notification_url).query)["context"][0]
list_params = {
"sid": "xiaomiio",
"context": context,
"_locale": "en_US"
}
_LOGGER.debug("GET /identity/list params=%s", list_params)
r = self._session.get("https://account.xiaomi.com/identity/list", params=list_params, headers=headers)
_LOGGER.debug("identity/list status=%s", r.status_code)
# 3) Request email ticket
send_params = {
"_dc": str(int(time.time() * 1000)),
"sid": "xiaomiio",
"context": list_params["context"],
"mask": "0",
"_locale": "en_US"
}
send_data = {
"retry": "0",
"icode": "",
"_json": "true",
"ick": self._session.cookies.get("ick", "")
}
_LOGGER.debug("sendEmailTicket POST url=https://account.xiaomi.com/identity/auth/sendEmailTicket params=%s", send_params)
_LOGGER.debug("sendEmailTicket data=%s", send_data)
r = self._session.post("https://account.xiaomi.com/identity/auth/sendEmailTicket",
params=send_params, data=send_data, headers=headers)
try:
jr = r.json()
except Exception:
jr = {}
_LOGGER.debug("sendEmailTicket response status=%s json=%s", r.status_code, jr)
# 4) Ask user for the email code and verify
if args.non_interactive:
parser.error("Email verification code required, rerun without --non_interactive")
print_if_interactive(f"{Fore.YELLOW}Two factor authentication required, please provide the code from the email.")
print_if_interactive()
print_if_interactive("2FA Code:")
code = input().strip()
print_if_interactive()
verify_params = {
"_flag": "8",
"_json": "true",
"sid": "xiaomiio",
"context": list_params["context"],
"mask": "0",
"_locale": "en_US"
}
verify_data = {
"_flag": "8",
"ticket": code,
"trust": "false",
"_json": "true",
"ick": self._session.cookies.get("ick", "")
}
r = self._session.post("https://account.xiaomi.com/identity/auth/verifyEmail",
params=verify_params, data=verify_data, headers=headers)
if r.status_code != 200:
_LOGGER.error("verifyEmail failed: status=%s body=%s", r.status_code, r.text[:500])
return False
try:
jr = r.json()
_LOGGER.debug("verifyEmail response status=%s json=%s", r.status_code, jr)
finish_loc = jr.get("location")
except Exception:
# Non-JSON or empty; try to extract from headers or body
_LOGGER.debug("verifyEmail returned non-JSON, attempting fallback extraction.")
finish_loc = r.headers.get("Location")
if not finish_loc and r.text:
m = re.search(r'https://account\.xiaomi\.com/identity/result/check\?[^"\']+', r.text)
if m:
finish_loc = m.group(0)
# Fallback: directly hit result/check using existing identity_session/context
if not finish_loc:
_LOGGER.debug("Using fallback call to /identity/result/check")
r0 = self._session.get(
"https://account.xiaomi.com/identity/result/check",
params={"sid": "xiaomiio", "context": context, "_locale": "en_US"},
headers=headers,
allow_redirects=False
)
_LOGGER.debug("result/check (fallback) status=%s hop-> %s", r0.status_code, r0.headers.get("Location"))
if r0.status_code in (301, 302) and r0.headers.get("Location"):
finish_loc = r0.url if "serviceLoginAuth2/end" in r0.url else r0.headers["Location"]
if not finish_loc:
_LOGGER.error("Unable to determine finish location after verifyEmail.")
return False
# First hop: GET identity/result/check (do NOT follow redirects to inspect Location)
if "identity/result/check" in finish_loc:
r = self._session.get(finish_loc, headers=headers, allow_redirects=False)
_LOGGER.debug("result/check status=%s hop-> %s", r.status_code, r.headers.get("Location"))
end_url = r.headers.get("Location")
else:
end_url = finish_loc
if not end_url:
_LOGGER.error("Could not find Auth2/end URL in finish chain.")
return False
# 6) Call Auth2/end WITHOUT redirects to capture 'extension-pragma' header containing ssecurity
r = self._session.get(end_url, headers=headers, allow_redirects=False)
_LOGGER.debug("Auth2/end status=%s", r.status_code)
_LOGGER.debug("Auth2/end body(trunc)=%s", r.text[:200])
# Some servers return 200 first (HTML 'Tips' page), then 302 on next call.
if r.status_code == 200 and "Xiaomi Account - Tips" in r.text:
r = self._session.get(end_url, headers=headers, allow_redirects=False)
_LOGGER.debug("Auth2/end(second) status=%s", r.status_code)
ext_prag = r.headers.get("extension-pragma")
if ext_prag:
try:
ep_json = json.loads(ext_prag)
ssec = ep_json.get("ssecurity")
psec = ep_json.get("psecurity")
_LOGGER.debug("extension-pragma present. ssecurity=%s psecurity=%s", ssec, psec)
if ssec:
self._ssecurity = ssec
except Exception as e:
_LOGGER.debug("Failed to parse extension-pragma: %s", e)
if not self._ssecurity:
_LOGGER.error("extension-pragma header missing ssecurity; cannot continue.")
return False
# 7) Find STS redirect and visit it (to set serviceToken cookie)
sts_url = r.headers.get("Location")
if not sts_url and r.text:
idx = r.text.find("https://sts.api.io.mi.com/sts")
if idx != -1:
end = r.text.find('"', idx)
if end == -1:
end = idx + 300
sts_url = r.text[idx:end]
if not sts_url:
_LOGGER.error("Auth2/end did not provide STS redirect.")
return False
r = self._session.get(sts_url, headers=headers, allow_redirects=True)
_LOGGER.debug("STS final URL: %s status=%s", r.url, r.status_code)
if r.status_code != 200:
_LOGGER.error("STS did not complete: status=%s body=%s", r.status_code, r.text[:200])
return False
# Extract serviceToken from cookie jar
self._serviceToken = self._session.cookies.get("serviceToken", domain=".sts.api.io.mi.com")
found = bool(self._serviceToken)
_LOGGER.debug("STS body (trunc)=%s", r.text[:20])
if not found:
_LOGGER.error("Could not parse serviceToken; cannot complete login.")
return False
_LOGGER.debug("STS did not return JSON; assuming 'ok' style response and relying on cookies.")
_LOGGER.debug("extract_service_token: found=%s", found)
# Mirror serviceToken to API domains expected by Mi Cloud
self.install_service_token_cookies(self._serviceToken)
# Update ids from cookies if available
self.userId = self.userId or self._session.cookies.get("userId", domain=".xiaomi.com") or self._session.cookies.get("userId", domain=".sts.api.io.mi.com")
self._cUserId = self._cUserId or self._session.cookies.get("cUserId", domain=".xiaomi.com") or self._session.cookies.get("cUserId", domain=".sts.api.io.mi.com")
return True
def install_service_token_cookies(self, token: str):
for d in [".api.io.mi.com", ".io.mi.com", ".mi.com"]:
self._session.cookies.set("serviceToken", token, domain=d)
self._session.cookies.set("yetAnotherServiceToken", token, domain=d)
class QrCodeXiaomiCloudConnector(XiaomiCloudConnector):
def __init__(self):
super().__init__()
self._cUserId = None
self._pass_token = None
self._location = None
self._qr_image_url = None
self._login_url = None
self._long_polling_url = None
def login(self) -> bool:
if not self.login_step_1():
print_if_interactive(f"{Fore.RED}Unable to get login message.")
return False
if not self.login_step_2():
print_if_interactive(f"{Fore.RED}Unable to get login QR Image.")
return False
if not self.login_step_3():
print_if_interactive(f"{Fore.RED}Unable to login.")
return False
if not self.login_step_4():
print_if_interactive(f"{Fore.RED}Unable to get service token.")
return False
return True
def login_step_1(self) -> bool:
_LOGGER.debug("login_step_1")
url = "https://account.xiaomi.com/longPolling/loginUrl"
data = {
"_qrsize": "480",
"qs": "%3Fsid%3Dxiaomiio%26_json%3Dtrue",
"callback": "https://sts.api.io.mi.com/sts",
"_hasLogo": "false",
"sid": "xiaomiio",
"serviceParam": "",
"_locale": "en_GB",
"_dc": str(int(time.time() * 1000))
}
response = self._session.get(url, params=data)
_LOGGER.debug(response.text)
if response.status_code == 200:
response_data = self.to_json(response.text)
if "qr" in response_data:
self._qr_image_url = response_data["qr"]
self._login_url = response_data["loginUrl"]
self._long_polling_url = response_data["lp"]
self._timeout = response_data["timeout"]
return True
return False
def login_step_2(self) -> bool:
_LOGGER.debug("login_step_2")
url = self._qr_image_url
_LOGGER.debug("login_step_2: Image URL: %s", url)
response = self._session.get(url)
valid: bool = response is not None and response.status_code == 200
if valid:
print_if_interactive(f"{Fore.BLUE}Please scan the following QR code to log in.")
present_image_image(
response.content,
message_url = f"QR code URL: {Fore.BLUE}http://{args.host or '127.0.0.1'}:31415",
message_file_saved = "QR code image saved at: {}",
message_manually_open_file = "Please open {} and scan the QR code."
)
print_if_interactive()
print_if_interactive(f"{Fore.BLUE}Alternatively you can visit the following URL:")
print_if_interactive(f"{Fore.BLUE} {self._login_url}")
print_if_interactive()
return True
else:
_LOGGER.error("login_step_2: HTTP status: %s; Response: %s", response.status_code, response.text[:500])
return False
def login_step_3(self) -> bool:
_LOGGER.debug("login_step_3")
url = self._long_polling_url
_LOGGER.debug("Long polling URL: " + url)
start_time = time.time()
# Start long polling
while True:
try:
response = self._session.get(url, timeout=10)
except requests.exceptions.Timeout:
_LOGGER.debug("Long polling timed out, retrying...")
if time.time() - start_time > self._timeout:
_LOGGER.debug("Long polling timed out after {} seconds.".format(self._timeout))
break
continue
except requests.exceptions.RequestException as e:
_LOGGER.error(f"An error occurred: {e}")
break
if response.status_code == 200:
break
else:
_LOGGER.error("Long polling failed, retrying...")
if response.status_code != 200:
_LOGGER.error("Long polling failed with status code: " + str(response.status_code))
return False
_LOGGER.debug("Login successful!")
_LOGGER.debug("Response data:")
response_data = self.to_json(response.text)
_LOGGER.debug(response_data)
self.userId = response_data["userId"]
self._ssecurity = response_data["ssecurity"]
self._cUserId = response_data["cUserId"]
self._pass_token = response_data["passToken"]
self._location = response_data["location"]
_LOGGER.debug("User ID: " + str(self.userId))
_LOGGER.debug("Ssecurity: " + str(self._ssecurity))
_LOGGER.debug("CUser ID: " + str(self._cUserId))
_LOGGER.debug("Pass token: " + str(self._pass_token))
_LOGGER.debug("Pass token: " + str(self._location))
return True
def login_step_4(self) -> bool:
_LOGGER.debug("login_step_4")
_LOGGER.debug("Fetching service token...")
if not (location := self._location):
_LOGGER.error("No location found.")
return False
response = self._session.get(location, headers={"content-type": "application/x-www-form-urlencoded"})
if response.status_code != 200:
return False
self._serviceToken = response.cookies["serviceToken"]
_LOGGER.debug("Service token: " + str(self._serviceToken))
return True
def print_if_interactive(value: str="") -> None:
if not args.non_interactive:
print(value)
def print_tabbed(value: str, tab: int) -> None:
print_if_interactive(" " * tab + value)
def print_entry(key: str, value: str, tab: int) -> None:
if value:
print_tabbed(f'{Fore.YELLOW}{key + ":": <10}{Style.RESET_ALL}{value}', tab)
def print_banner() -> None:
print_if_interactive(Fore.YELLOW + Style.BRIGHT + r"""
Xiaomi Cloud
___ ____ _ _ ____ _ _ ____ ____ _ _ ___ ____ ____ ____ ___ ____ ____
| | | |_/ |___ |\ | [__ |___ \/ | |__/ |__| | | | | |__/
| |__| | \_ |___ | \| ___] |___ _/\_ | | \ | | |___ | |__| | \
""" + Style.NORMAL +
""" by Piotr Machowski
""")
def start_image_server(image: bytes) -> None:
class ImgHttpHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
self.send_response(200)
self.end_headers()
self.wfile.write(image)
def log_message(self, msg, *args) -> None:
_LOGGER.debug(msg, *args)
httpd = HTTPServer(("", 31415), ImgHttpHandler)
_LOGGER.info("server address: %s", httpd.server_address)
_LOGGER.info("hostname: %s", socket.gethostname())
thread = threading.Thread(target = httpd.serve_forever)
thread.daemon = True
thread.start()
def present_image_image(
image_content: bytes,
message_url: str,
message_file_saved: str,
message_manually_open_file: str,
) -> None:
try:
# Try to serve an image file
start_image_server(image_content)
print_if_interactive(message_url)
except Exception as e1:
_LOGGER.debug(e1)
# Save image to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(image_content)
tmp_path: str = tmp.name
print_if_interactive(message_file_saved.format(tmp_path))
try:
img = Image.open(tmp_path)
img.show()
except Exception as e2:
_LOGGER.debug(e2)
print_if_interactive(message_manually_open_file.format(tmp_path))
def main() -> None:
print_banner()
if args.non_interactive:
connector = PasswordXiaomiCloudConnector()
else:
print_if_interactive("Please select a way to log in:")
print_if_interactive(f" p{Fore.BLUE} - using password")
print_if_interactive(f" q{Fore.BLUE} - using QR code")
log_in_method = ""
while not log_in_method in ["P", "Q"]:
log_in_method = input("p/q: ").upper()
if log_in_method == "P":
connector = PasswordXiaomiCloudConnector()
else:
connector = QrCodeXiaomiCloudConnector()
print_if_interactive()
logged = connector.login()
if logged:
print_if_interactive(f"{Fore.GREEN}Logged in.")
print_if_interactive()
servers_to_check = get_servers_to_check()
print_if_interactive()
output = []
for current_server in servers_to_check:
all_homes = []
homes = connector.get_homes(current_server)
if homes is not None:
for h in homes["result"]["homelist"]:
all_homes.append({"home_id": h["id"], "home_owner": connector.userId})
dev_cnt = connector.get_dev_cnt(current_server)
if dev_cnt is not None:
for h in dev_cnt["result"]["share"]["share_family"]:
all_homes.append({"home_id": h["home_id"], "home_owner": h["home_owner"]})
if len(all_homes) == 0:
print_if_interactive(f'{Fore.RED}No homes found for server "{current_server}".')
for home in all_homes:
devices = connector.get_devices(current_server, home["home_id"], home["home_owner"])
home["devices"] = []
if devices is not None:
if devices["result"]["device_info"] is None or len(devices["result"]["device_info"]) == 0:
print_if_interactive(f'{Fore.RED}No devices found for server "{current_server}" @ home "{home["home_id"]}".')
continue
print_if_interactive(f'Devices found for server "{current_server}" @ home "{home["home_id"]}":')
for device in devices["result"]["device_info"]:
device_data = {**device}
print_tabbed(f"{Fore.BLUE}---------", 3)
if "name" in device:
print_entry("NAME", device["name"], 3)
if "did" in device:
print_entry("ID", device["did"], 3)
if "blt" in device["did"]:
beaconkey = connector.get_beaconkey(current_server, device["did"])
if beaconkey and "result" in beaconkey and "beaconkey" in beaconkey["result"]:
print_entry("BLE KEY", beaconkey["result"]["beaconkey"], 3)
device_data["BLE_DATA"] = beaconkey["result"]
if "mac" in device:
print_entry("MAC", device["mac"], 3)
if "localip" in device:
print_entry("IP", device["localip"], 3)
if "token" in device:
print_entry("TOKEN", device["token"], 3)
if "model" in device:
print_entry("MODEL", device["model"], 3)
home["devices"].append(device_data)
print_tabbed(f"{Fore.BLUE}---------", 3)
print_if_interactive()
else:
print_if_interactive(f"{Fore.RED}Unable to get devices from server {current_server}.")
output.append({"server": current_server, "homes": all_homes})
if args.output:
with open(args.output, "w") as f:
f.write(json.dumps(output, indent=4))
else:
print_if_interactive(f"{Fore.RED}Unable to log in.")
if not args.non_interactive:
print_if_interactive()
print_if_interactive("Press ENTER to finish")
input()
def get_servers_to_check() -> list[str]:
servers_str = ", ".join(SERVERS)
if args.server is not None:
server = args.server
elif args.non_interactive:
server = ""
else:
print_if_interactive(
f"Select server {Fore.BLUE}(one of: {servers_str}; Leave empty to check all available){Style.RESET_ALL}:")
server = input()
while server not in ["", *SERVERS]:
print_if_interactive(f"{Fore.RED}Invalid server provided. Valid values: {servers_str}")
print_if_interactive("Server:")
server = input()
print_if_interactive()
if not server == "":
servers_to_check = [server]
else:
servers_to_check = [*SERVERS]
return servers_to_check
if __name__ == "__main__":
main()