mirror of
https://gitlab.sectorq.eu/jaydee/ansible.git
synced 2026-05-22 13:36:20 +02:00
Compare commits
16 Commits
13b93d7f2b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 8f57304112 | |||
| 6d496a04af | |||
| 99d4075edd | |||
| 8fb84b3c47 | |||
| e3cd8f5843 | |||
| 68775c50da | |||
| 0dabeb4985 | |||
| f2b0e75ea4 | |||
| 7833d21d12 | |||
| 8383e6062f | |||
| 00ff11eba8 | |||
| 21dd3b2aa6 | |||
| 0795220a91 | |||
| 774d23898d | |||
| 5f0505283e | |||
| a8e1883f22 |
@@ -1,6 +1,8 @@
|
||||
- hosts: datacenter
|
||||
name: Roles
|
||||
gather_facts: false
|
||||
vars_files:
|
||||
- jaydee.yml # vault file
|
||||
roles:
|
||||
- name: zabbix_proxy_service
|
||||
role: zabbix_proxy_service
|
||||
@@ -106,4 +108,7 @@
|
||||
tags: terraform
|
||||
- name: setup_repo_sync
|
||||
role: setup_repo_sync
|
||||
tags: setup_repo_sync
|
||||
tags: setup_repo_sync
|
||||
- name: puppet-agent
|
||||
role: puppet-agent
|
||||
tags: puppet-agent
|
||||
|
||||
+4
-6
@@ -26,14 +26,11 @@ datacenter:
|
||||
ansible_user: root
|
||||
ryzen:
|
||||
hosts:
|
||||
192.168.77.15:
|
||||
ryzen.home.lan:
|
||||
vars:
|
||||
ansible_python_interpreter: auto_silent
|
||||
ansible_ssh_user: root
|
||||
ansible_ssh_pass: lacijaydee
|
||||
ansible_password: lacijaydee
|
||||
ansible_become_user: root
|
||||
ansible_become_password: lacijaydee
|
||||
ansible_ssh_user: jd
|
||||
ansible_ssh_private_key_file: ssh_key.pem
|
||||
omv:
|
||||
hosts:
|
||||
192.168.77.189:
|
||||
@@ -177,6 +174,7 @@ datacenter:
|
||||
desktops:
|
||||
hosts:
|
||||
morefine.home.lan:
|
||||
asus.home.lan:
|
||||
vars:
|
||||
ansible_python_interpreter: /usr/bin/python3
|
||||
ansible_ssh_user: jd
|
||||
|
||||
+26
-25
@@ -1,26 +1,27 @@
|
||||
$ANSIBLE_VAULT;1.1;AES256
|
||||
38393362333061613238333964643062316633323761346333646464373437353630656635323561
|
||||
6433353166343337346433643839363866383831653666370a366231343233626366643466353762
|
||||
39346263313962343730333364643461376436663061323633623537346238316363343735343136
|
||||
6631653665653363660a663136613961396163636163636465333163363537663634386261393537
|
||||
30333233316236343161663136313635353162633930646431613335313264303565616435636662
|
||||
61343363386430303438333934393362643661356636316432396266623762393833333539633166
|
||||
64623230353861356530653263306637323964336231353665346264333463663030366338616330
|
||||
62373461363333643838613034636537353233393137656135386131656130363462653633366336
|
||||
66323236346234616639383066643638303937623938333136623131396334623263653763656131
|
||||
34306366343738663930666536663664613732343566323430353638393662666130393366663861
|
||||
35346532656531326366343963396334393432343931656332663933616131643161363933383738
|
||||
63616164316637626639653835356433613764663361393933346239343866323164643765336636
|
||||
64373764373964613864386133623835303163663933396534303132323630326236336132393063
|
||||
63666264613331306165326232386166633039333030346338383632363739393566383863663761
|
||||
64656332643838633037373030336265663738343563303262386437353066313466393764616231
|
||||
35653063623263666439393635613037323439616630363837353930626632313437303762633433
|
||||
37646664376532343632613162333039326463646231383939373965623434306665383662633861
|
||||
31383335393236313362323762633833366461643264633730636133646335393532376133386432
|
||||
66666534393431333235326139333936616365356434636638346433663566666633306237313264
|
||||
32313031663166353465653863643762313163386137306238383436343631303630643266353035
|
||||
35306636643735396630613538326461656132623931666234666232396238313062343265396666
|
||||
62343766616338383962386165393930396536666663643963383237663236666664303837613135
|
||||
35666234363462616562366333396366353838353763613639653936303039613863323565306263
|
||||
35373266303035613638653338346434616634353534393838646131366432633266353636383433
|
||||
6638
|
||||
66616666346233663164303662323266303431383638623139373163636566636230313862646664
|
||||
3866616633376432666530663963346561613538633563610a653835323733353465306562646631
|
||||
33653763636238323363653439316634623134366266363032353561343136353037323635656661
|
||||
3436333637616362360a316636386463656466653964623166323639323834386535376532343335
|
||||
62303631636135363630343833613963313564386636636561393039326361356232333162333336
|
||||
30306266623030623632366135313335386466393563343030356635646563306434336137396439
|
||||
39666531336233646638643730386137323233653061373330373430666639393230373566626534
|
||||
36643335356566396239396265663738373838643234333438356630633031323635663666666530
|
||||
37363163356136306366333036656162643630613039623730363365636133373264313832623066
|
||||
31393233613665643261366337643332363032303461636138313635333130626139666666376439
|
||||
38373437326164616434363662613535323362363561313962396665356538336333373766626361
|
||||
37626161663238356162326461613634363462623534336632306331356239363562396662396130
|
||||
63626337383232366335643561613766313038643464323836643130626533633339623633326233
|
||||
34393235376565616463386539366638336633343430363639393666633932316266653466393465
|
||||
35303832636562376665303961656134363463376637643438353038343631303164326130323966
|
||||
39346563663062623937633639303739393665343838666137316132633631376230633763323535
|
||||
31343965363965303561356638346130316139303832653863623762393664616335646364643437
|
||||
37336436643362393935343037633730666362636338343466646632646266303930656436313263
|
||||
36613464313735303566613762396261373737316563646266646237386139633663333564346136
|
||||
33353761363663613865623863666132383730396631393563643434346437326365373238666639
|
||||
65626339313264666439653430306331363266383232383132653562666165383436346464663265
|
||||
31643830656136636533363265393362393934343531303362653232646232643563313738613939
|
||||
31323365333461636637356265393665646235643530376331303931623138616637386362326132
|
||||
32656334643864626136353934363433316463303637336363653934623834326337663334346262
|
||||
34376539353230303639633235383635613739343633613361356665373662383866343439383465
|
||||
3137313662333665303064646362616639333031633239626462
|
||||
|
||||
@@ -3,7 +3,7 @@ mkdir /tmp/certgen 2>/dev/null
|
||||
cd /tmp/certgen
|
||||
|
||||
openssl genrsa -aes256 -out ca-key.pem -passout pass:"foobarpwd" 4096
|
||||
openssl req -new -x509 -days 365 -key ca-key.pem -sha256 -out ca.pem -subj "/C=SK/ST=Slovakia/L=SK/O=sectorq.cloud/OU=IT/CN=sectorq.cloud" -passin pass:"foobarpwd"
|
||||
openssl req -new -x509 -days 365 -key ca-key.pem -sha256 -out ca.pem -subj "/C=SK/ST=Slovakia/L=SK/O=sectorq.eu/OU=IT/CN=sectorq.eu" -passin pass:"foobarpwd"
|
||||
openssl genrsa -out server-key.pem 4096
|
||||
if [[ `hostname` == "nas" ]]
|
||||
then
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
- name: Upgrade
|
||||
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
|
||||
ignore_unreachable: true
|
||||
block:
|
||||
- name: Include vault
|
||||
ansible.builtin.include_vars:
|
||||
@@ -7,6 +8,7 @@
|
||||
- name: Include facts
|
||||
ansible.builtin.include_role:
|
||||
name: setup
|
||||
|
||||
when: ansible_facts.architecture is not defined
|
||||
|
||||
- name: Upgrade Debian
|
||||
@@ -29,6 +31,6 @@
|
||||
|
||||
- name: Upgrade flatpack
|
||||
ansible.builtin.command: flatpak update -y
|
||||
when: inventory_hostname == 'morefine.home.lan'
|
||||
when: inventory_hostname in ['morefine.home.lan','ryzen.home.lan','asus.home.lan']
|
||||
register: logo
|
||||
changed_when: "logo.rc == 0"
|
||||
|
||||
Executable
+7
@@ -0,0 +1,7 @@
|
||||
- name: Upgrade
|
||||
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
|
||||
block:
|
||||
- name: Install the dbeaver flatpak
|
||||
community.general.flatpak:
|
||||
name: app/io.dbeaver.DBeaverCommunity/x86_64/stable
|
||||
state: present
|
||||
@@ -0,0 +1,4 @@
|
||||
username: "jd"
|
||||
user_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
|
||||
new_root_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
|
||||
user_groups: "sudo"
|
||||
@@ -2,9 +2,14 @@
|
||||
become: "{{ 'no' if inventory_hostname == 'nas.home.lan' else 'yes' }}"
|
||||
block:
|
||||
|
||||
- name: Include role proxy_repo
|
||||
ansible.builtin.include_role:
|
||||
name: proxy_repo
|
||||
- name: Disable SELinux
|
||||
ansible.posix.selinux:
|
||||
state: permissive
|
||||
policy: targeted
|
||||
|
||||
# - name: Include role proxy_repo
|
||||
# ansible.builtin.include_role:
|
||||
# name: proxy_repo
|
||||
|
||||
- name: Include role local mirror
|
||||
ansible.builtin.include_role:
|
||||
@@ -12,11 +17,6 @@
|
||||
when: use_local_repo | default(false)
|
||||
|
||||
|
||||
- name: Disable SELinux
|
||||
ansible.posix.selinux:
|
||||
state: permissive
|
||||
policy: targeted
|
||||
|
||||
- name: Disable swap
|
||||
command: swapoff -a
|
||||
when: ansible_swaptotal_mb > 0
|
||||
@@ -58,7 +58,6 @@
|
||||
- lvm2
|
||||
- epel-release
|
||||
- git
|
||||
- helm
|
||||
state: present
|
||||
|
||||
- name: Add Kubernetes repo
|
||||
@@ -115,6 +114,7 @@
|
||||
- kubelet
|
||||
- kubeadm
|
||||
- kubectl
|
||||
- helm
|
||||
disable_excludes: kubernetes
|
||||
state: present
|
||||
register: k8s_install
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCjPi74gAHT2GBbKWWmmUrzzrhKQN3mnz3DTWn02KhbgWs6MRLlTrn2fLqB4hkoVEOat29wAOJ2xuY4aEjGfO6nl/3ka3oqslwxW9GqFCRXVHnF3b+Wly+jkn3tbht+gjFK4x0np1WZbpTI/nR7gElWa774LCHy5i4fm5ISoCHzRMsKd3dVF4YOte6QC9Uu4uQQjpZWksYRjANtXAk5EDEOERz+Dh8RDkXbGYW2vx1ihNQGh53z3c6W6bajGq3qx/+G905MtRgutbHWOOYSE8o2q3vaDkCNA6Fjd3P6gHTPcRjFEfoHc8Vxp0vjR9e6ol8X7D+GEL6g6QN656772Zo3/Qka0rJ/zMnT3gZ2S4i0CzmqDoecbzDCX7ywwuwgvlmJGOswgwwOKP/E0/RnWi63BUFV/fU9o5nr9ZXgkiRa9ZCzubHZhxyN6Z7EhcOVnMbvfV1W5Fn/vbj84YudCLmzrk5qvpQ+qVsMXi1GPIEyFwyg/Afu5JTpNE5+4ViFzTM= jd@morefine
|
||||
@@ -0,0 +1,4 @@
|
||||
username: "jd"
|
||||
user_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
|
||||
new_root_password: "{{ 'l4c1j4yd33Du5lo' | password_hash('sha512') }}"
|
||||
user_groups: "sudo"
|
||||
@@ -0,0 +1,38 @@
|
||||
Role Name
|
||||
=========
|
||||
|
||||
A brief description of the role goes here.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
Any pre-requisites that may not be covered by Ansible itself or the role should be mentioned here. For instance, if the role uses the EC2 module, it may be a good idea to mention in this section that the boto package is required.
|
||||
|
||||
Role Variables
|
||||
--------------
|
||||
|
||||
A description of the settable variables for this role should go here, including any variables that are in defaults/main.yml, vars/main.yml, and any variables that can/should be set via parameters to the role. Any variables that are read from other roles and/or the global scope (ie. hostvars, group vars, etc.) should be mentioned here as well.
|
||||
|
||||
Dependencies
|
||||
------------
|
||||
|
||||
A list of other roles hosted on Galaxy should go here, plus any details in regards to parameters that may need to be set for other roles, or variables that are used from other roles.
|
||||
|
||||
Example Playbook
|
||||
----------------
|
||||
|
||||
Including an example of how to use your role (for instance, with variables passed in as parameters) is always nice for users too:
|
||||
|
||||
- hosts: servers
|
||||
roles:
|
||||
- { role: username.rolename, x: 42 }
|
||||
|
||||
License
|
||||
-------
|
||||
|
||||
BSD
|
||||
|
||||
Author Information
|
||||
------------------
|
||||
|
||||
An optional section for the role authors to include contact information, or a website (HTML is not allowed).
|
||||
@@ -0,0 +1,2 @@
|
||||
---
|
||||
# defaults file for puppet-agent
|
||||
@@ -0,0 +1,2 @@
|
||||
---
|
||||
# handlers file for puppet-agent
|
||||
@@ -0,0 +1,52 @@
|
||||
galaxy_info:
|
||||
author: your name
|
||||
description: your role description
|
||||
company: your company (optional)
|
||||
|
||||
# If the issue tracker for your role is not on github, uncomment the
|
||||
# next line and provide a value
|
||||
# issue_tracker_url: http://example.com/issue/tracker
|
||||
|
||||
# Choose a valid license ID from https://spdx.org - some suggested licenses:
|
||||
# - BSD-3-Clause (default)
|
||||
# - MIT
|
||||
# - GPL-2.0-or-later
|
||||
# - GPL-3.0-only
|
||||
# - Apache-2.0
|
||||
# - CC-BY-4.0
|
||||
license: license (GPL-2.0-or-later, MIT, etc)
|
||||
|
||||
min_ansible_version: 2.1
|
||||
|
||||
# If this a Container Enabled role, provide the minimum Ansible Container version.
|
||||
# min_ansible_container_version:
|
||||
|
||||
#
|
||||
# Provide a list of supported platforms, and for each platform a list of versions.
|
||||
# If you don't wish to enumerate all versions for a particular platform, use 'all'.
|
||||
# To view available platforms and versions (or releases), visit:
|
||||
# https://galaxy.ansible.com/api/v1/platforms/
|
||||
#
|
||||
# platforms:
|
||||
# - name: Fedora
|
||||
# versions:
|
||||
# - all
|
||||
# - 25
|
||||
# - name: SomePlatform
|
||||
# versions:
|
||||
# - all
|
||||
# - 1.0
|
||||
# - 7
|
||||
# - 99.99
|
||||
|
||||
galaxy_tags: []
|
||||
# List tags for your role here, one per line. A tag is a keyword that describes
|
||||
# and categorizes the role. Users find roles by searching for tags. Be sure to
|
||||
# remove the '[]' above, if you add tags to this list.
|
||||
#
|
||||
# NOTE: A tag is limited to a single word comprised of alphanumeric characters.
|
||||
# Maximum 20 tags per role.
|
||||
|
||||
dependencies: []
|
||||
# List your role dependencies here, one per line. Be sure to remove the '[]' above,
|
||||
# if you add dependencies to this list.
|
||||
@@ -0,0 +1,46 @@
|
||||
- name: Install puppet agent
|
||||
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
|
||||
vars:
|
||||
puppet_server: active.home.lan
|
||||
block:
|
||||
# - name: Ensure Puppet repo is present (Debian/Ubuntu)
|
||||
# apt_repository:
|
||||
# repo: "deb http://apt.puppetlabs.com {{ ansible_distribution_release }} puppet6"
|
||||
# state: present
|
||||
# when: ansible_os_family == "Debian"
|
||||
- name: Facts
|
||||
ansible.builtin.setup:
|
||||
when: ansible_facts.architecture is not defined
|
||||
- name: Install Puppet agent package
|
||||
ansible.builtin.package:
|
||||
name: puppet-agent
|
||||
state: present
|
||||
|
||||
- name: Create Puppet configuration directory
|
||||
ansible.builtin.file:
|
||||
path: /etc/puppetlabs/puppet
|
||||
state: directory
|
||||
mode: '0755'
|
||||
|
||||
- name: Deploy puppet.conf
|
||||
ansible.builtin.template:
|
||||
src: puppet.conf.j2
|
||||
dest: /etc/puppet/puppet.conf
|
||||
owner: root
|
||||
group: root
|
||||
mode: '0644'
|
||||
|
||||
- name: Enable and start puppet agent
|
||||
ansible.builtin.systemd:
|
||||
name: puppet
|
||||
enabled: true
|
||||
state: started
|
||||
|
||||
- name: Trigger puppet agent run once
|
||||
ansible.builtin.command: /usr/bin/puppet agent -t
|
||||
register: puppet_run
|
||||
changed_when: puppet_run.rc != 0
|
||||
|
||||
- name: Debug puppet run output
|
||||
ansible.builtin.debug:
|
||||
var: puppet_run.stdout_lines
|
||||
@@ -0,0 +1,5 @@
|
||||
[main]
|
||||
server = {{ puppet_server }}
|
||||
certname = {{ inventory_hostname }}
|
||||
environment = production
|
||||
runinterval = 10m
|
||||
@@ -0,0 +1,2 @@
|
||||
localhost
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
- hosts: localhost
|
||||
remote_user: root
|
||||
roles:
|
||||
- puppet-agent
|
||||
@@ -0,0 +1,3 @@
|
||||
---
|
||||
# vars file for puppet-agent
|
||||
puppet_server: active.home.lan
|
||||
Executable
+40
@@ -0,0 +1,40 @@
|
||||
- name: Set banner
|
||||
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
|
||||
block:
|
||||
- name: Install packages
|
||||
ansible.builtin.apt:
|
||||
name:
|
||||
- figlet
|
||||
- toilet
|
||||
|
||||
- name: Create Banner
|
||||
ansible.builtin.command: |
|
||||
figlet -c {{ (inventory_hostname | split('.'))[0] }} -f slant
|
||||
register: logo
|
||||
changed_when: "logo.rc == 0"
|
||||
|
||||
- name: Creating a file with content
|
||||
ansible.builtin.copy:
|
||||
dest: "/etc/motd"
|
||||
content: |
|
||||
{{ logo.stdout }}
|
||||
owner: 0
|
||||
group: 0
|
||||
mode: "0777"
|
||||
|
||||
- name: Reconfigure sshd
|
||||
ansible.builtin.lineinfile:
|
||||
path: /etc/ssh/sshd_config
|
||||
regexp: "^Banner.* "
|
||||
line: "#Banner /etc/banner"
|
||||
|
||||
- name: Reconfigure sshd
|
||||
ansible.builtin.lineinfile:
|
||||
path: /etc/ssh/sshd_config
|
||||
regexp: "^#PrintLastLog.* "
|
||||
line: "PrintLastLog no"
|
||||
|
||||
- name: Sshd
|
||||
ansible.builtin.service:
|
||||
name: ssh.service
|
||||
state: restarted
|
||||
@@ -1,6 +1,9 @@
|
||||
- name: SSH config Setup
|
||||
become: "{{ false if inventory_hostname == 'nas.home.lan' else true }}"
|
||||
block:
|
||||
- name: Include vault
|
||||
ansible.builtin.include_vars:
|
||||
file: jaydee.yml
|
||||
- name: Upload config
|
||||
ansible.builtin.copy:
|
||||
src: config
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJSABeQ3x7nM3HBil1LpGYLrdz5NbXVl6l6dNMr0lnwZ jd@ryzen
|
||||
@@ -18,8 +18,8 @@
|
||||
when: inventory_hostname != 'nas.home.lan'
|
||||
- name: Upload key
|
||||
ansible.builtin.copy:
|
||||
src: id_rsa.pub
|
||||
dest: /home/jd/.ssh/id_rsa.pub
|
||||
src: id_ed25519_homelab.pub
|
||||
dest: /home/jd/.ssh/id_ed25519_homelab.pub
|
||||
mode: '0600'
|
||||
owner: jd
|
||||
group: jd
|
||||
@@ -28,4 +28,4 @@
|
||||
ansible.posix.authorized_key:
|
||||
user: jd
|
||||
state: present
|
||||
key: "{{ lookup('file', '/home/jd/.ssh/id_rsa.pub') }}"
|
||||
key: "{{ lookup('file', '/home/jd/.ssh/id_ed25519_homelab.pub') }}"
|
||||
|
||||
Binary file not shown.
@@ -0,0 +1,5 @@
|
||||
requests
|
||||
pycryptodome
|
||||
charset-normalizer
|
||||
Pillow
|
||||
colorama
|
||||
@@ -0,0 +1,935 @@
|
||||
from abc import ABC, abstractmethod
|
||||
import argparse
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
from getpass import getpass
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
import requests
|
||||
from colorama import Fore, Style, init
|
||||
|
||||
try:
|
||||
from Crypto.Cipher import ARC4
|
||||
except ModuleNotFoundError:
|
||||
from Cryptodome.Cipher import ARC4
|
||||
|
||||
from PIL import Image
|
||||
|
||||
if sys.platform != "win32":
|
||||
import readline
|
||||
|
||||
SERVERS = ["cn", "de", "us", "ru", "tw", "sg", "in", "i2"]
|
||||
NAME_TO_LEVEL = {
|
||||
"CRITICAL": logging.CRITICAL,
|
||||
"FATAL": logging.FATAL,
|
||||
"ERROR": logging.ERROR,
|
||||
"WARN": logging.WARNING,
|
||||
"WARNING": logging.WARNING,
|
||||
"INFO": logging.INFO,
|
||||
"DEBUG": logging.DEBUG,
|
||||
"NOTSET": logging.NOTSET,
|
||||
}
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-ni", "--non_interactive", required=False, help="Non-interactive mode", action="store_true")
|
||||
parser.add_argument("-u", "--username", required=False, help="Username")
|
||||
parser.add_argument("-p", "--password", required=False, help="Password")
|
||||
parser.add_argument("-s", "--server", required=False, help="Server", choices=[*SERVERS, ""])
|
||||
parser.add_argument("-l", "--log_level", required=False, help="Log level", default="CRITICAL", choices=list(NAME_TO_LEVEL.keys()))
|
||||
parser.add_argument("-o", "--output", required=False, help="Output file")
|
||||
parser.add_argument("--host", required=False, help="Host")
|
||||
args = parser.parse_args()
|
||||
if args.non_interactive and (not args.username or not args.password):
|
||||
parser.error("You need to specify username and password or run as interactive.")
|
||||
|
||||
init(autoreset=True)
|
||||
|
||||
class ColorFormatter(logging.Formatter):
|
||||
COLORS = {
|
||||
"CRITICAL": Fore.RED + Style.BRIGHT,
|
||||
"FATAL": Fore.RED + Style.BRIGHT,
|
||||
"ERROR": Fore.RED,
|
||||
"WARN": Fore.YELLOW,
|
||||
"WARNING": Fore.YELLOW,
|
||||
"INFO": Fore.GREEN,
|
||||
"DEBUG": Fore.BLUE,
|
||||
}
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
color = self.COLORS.get(record.levelname, "")
|
||||
return color + logging.Formatter.format(self, record)
|
||||
|
||||
|
||||
class ColorLogger(logging.Logger):
|
||||
def __init__(self, name: str) -> None:
|
||||
level = NAME_TO_LEVEL[args.log_level.upper()]
|
||||
logging.Logger.__init__(self, name, level)
|
||||
color_formatter = ColorFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setFormatter(color_formatter)
|
||||
self.addHandler(handler)
|
||||
|
||||
logging.setLoggerClass(ColorLogger)
|
||||
_LOGGER = logging.getLogger("token_extractor")
|
||||
|
||||
|
||||
class XiaomiCloudConnector(ABC):
|
||||
|
||||
def __init__(self):
|
||||
self._agent = self.generate_agent()
|
||||
self._device_id = self.generate_device_id()
|
||||
self._session = requests.session()
|
||||
self._ssecurity = None
|
||||
self.userId = None
|
||||
self._serviceToken = None
|
||||
|
||||
@abstractmethod
|
||||
def login(self) -> bool:
|
||||
pass
|
||||
|
||||
def get_homes(self, country):
|
||||
url = self.get_api_url(country) + "/v2/homeroom/gethome"
|
||||
params = {
|
||||
"data": '{"fg": true, "fetch_share": true, "fetch_share_dev": true, "limit": 300, "app_ver": 7}'}
|
||||
return self.execute_api_call_encrypted(url, params)
|
||||
|
||||
def get_devices(self, country, home_id, owner_id):
|
||||
url = self.get_api_url(country) + "/v2/home/home_device_list"
|
||||
params = {
|
||||
"data": '{"home_owner": ' + str(owner_id) +
|
||||
',"home_id": ' + str(home_id) +
|
||||
', "limit": 200, "get_split_device": true, "support_smart_home": true}'
|
||||
}
|
||||
return self.execute_api_call_encrypted(url, params)
|
||||
|
||||
def get_dev_cnt(self, country):
|
||||
url = self.get_api_url(country) + "/v2/user/get_device_cnt"
|
||||
params = {
|
||||
"data": '{ "fetch_own": true, "fetch_share": true}'
|
||||
}
|
||||
return self.execute_api_call_encrypted(url, params)
|
||||
|
||||
def get_beaconkey(self, country, did):
|
||||
url = self.get_api_url(country) + "/v2/device/blt_get_beaconkey"
|
||||
params = {
|
||||
"data": '{"did":"' + did + '","pdid":1}'
|
||||
}
|
||||
return self.execute_api_call_encrypted(url, params)
|
||||
|
||||
def execute_api_call_encrypted(self, url, params):
|
||||
headers = {
|
||||
"Accept-Encoding": "identity",
|
||||
"User-Agent": self._agent,
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"x-xiaomi-protocal-flag-cli": "PROTOCAL-HTTP2",
|
||||
"MIOT-ENCRYPT-ALGORITHM": "ENCRYPT-RC4",
|
||||
}
|
||||
cookies = {
|
||||
"userId": str(self.userId),
|
||||
"yetAnotherServiceToken": str(self._serviceToken),
|
||||
"serviceToken": str(self._serviceToken),
|
||||
"locale": "en_GB",
|
||||
"timezone": "GMT+02:00",
|
||||
"is_daylight": "1",
|
||||
"dst_offset": "3600000",
|
||||
"channel": "MI_APP_STORE"
|
||||
}
|
||||
millis = round(time.time() * 1000)
|
||||
nonce = self.generate_nonce(millis)
|
||||
signed_nonce = self.signed_nonce(nonce)
|
||||
fields = self.generate_enc_params(url, "POST", signed_nonce, nonce, params, self._ssecurity)
|
||||
response = self._session.post(url, headers=headers, cookies=cookies, params=fields)
|
||||
if response.status_code == 200:
|
||||
decoded = self.decrypt_rc4(self.signed_nonce(fields["_nonce"]), response.text)
|
||||
return json.loads(decoded)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_api_url(country):
|
||||
return "https://" + ("" if country == "cn" else (country + ".")) + "api.io.mi.com/app"
|
||||
|
||||
def signed_nonce(self, nonce):
|
||||
hash_object = hashlib.sha256(base64.b64decode(self._ssecurity) + base64.b64decode(nonce))
|
||||
return base64.b64encode(hash_object.digest()).decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def signed_nonce_sec(nonce, ssecurity):
|
||||
hash_object = hashlib.sha256(base64.b64decode(ssecurity) + base64.b64decode(nonce))
|
||||
return base64.b64encode(hash_object.digest()).decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def generate_nonce(millis):
|
||||
nonce_bytes = os.urandom(8) + (int(millis / 60000)).to_bytes(4, byteorder="big")
|
||||
return base64.b64encode(nonce_bytes).decode()
|
||||
|
||||
@staticmethod
|
||||
def generate_agent():
|
||||
agent_id = "".join(
|
||||
map(lambda i: chr(i), [random.randint(65, 69) for _ in range(13)])
|
||||
)
|
||||
random_text = "".join(map(lambda i: chr(i), [random.randint(97, 122) for _ in range(18)]))
|
||||
return f"{random_text}-{agent_id} APP/com.xiaomi.mihome APPV/10.5.201"
|
||||
|
||||
@staticmethod
|
||||
def generate_device_id():
|
||||
return "".join(map(lambda i: chr(i), [random.randint(97, 122) for _ in range(6)]))
|
||||
|
||||
@staticmethod
|
||||
def generate_signature(url, signed_nonce, nonce, params):
|
||||
signature_params = [url.split("com")[1], signed_nonce, nonce]
|
||||
for k, v in params.items():
|
||||
signature_params.append(f"{k}={v}")
|
||||
signature_string = "&".join(signature_params)
|
||||
signature = hmac.new(base64.b64decode(signed_nonce), msg=signature_string.encode(), digestmod=hashlib.sha256)
|
||||
return base64.b64encode(signature.digest()).decode()
|
||||
|
||||
@staticmethod
|
||||
def generate_enc_signature(url, method, signed_nonce, params):
|
||||
signature_params = [str(method).upper(), url.split("com")[1].replace("/app/", "/")]
|
||||
for k, v in params.items():
|
||||
signature_params.append(f"{k}={v}")
|
||||
signature_params.append(signed_nonce)
|
||||
signature_string = "&".join(signature_params)
|
||||
return base64.b64encode(hashlib.sha1(signature_string.encode("utf-8")).digest()).decode()
|
||||
|
||||
@staticmethod
|
||||
def generate_enc_params(url, method, signed_nonce, nonce, params, ssecurity):
|
||||
params["rc4_hash__"] = XiaomiCloudConnector.generate_enc_signature(url, method, signed_nonce, params)
|
||||
for k, v in params.items():
|
||||
params[k] = XiaomiCloudConnector.encrypt_rc4(signed_nonce, v)
|
||||
params.update({
|
||||
"signature": XiaomiCloudConnector.generate_enc_signature(url, method, signed_nonce, params),
|
||||
"ssecurity": ssecurity,
|
||||
"_nonce": nonce,
|
||||
})
|
||||
return params
|
||||
|
||||
@staticmethod
|
||||
def to_json(response_text):
|
||||
return json.loads(response_text.replace("&&&START&&&", ""))
|
||||
|
||||
@staticmethod
|
||||
def encrypt_rc4(password, payload):
|
||||
r = ARC4.new(base64.b64decode(password))
|
||||
r.encrypt(bytes(1024))
|
||||
return base64.b64encode(r.encrypt(payload.encode())).decode()
|
||||
|
||||
@staticmethod
|
||||
def decrypt_rc4(password, payload):
|
||||
r = ARC4.new(base64.b64decode(password))
|
||||
r.encrypt(bytes(1024))
|
||||
return r.encrypt(base64.b64decode(payload))
|
||||
|
||||
|
||||
class PasswordXiaomiCloudConnector(XiaomiCloudConnector):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._sign = None
|
||||
self._cUserId = None
|
||||
self._passToken = None
|
||||
self._location = None
|
||||
self._code = None
|
||||
|
||||
def login(self) -> bool:
|
||||
if args.username:
|
||||
self._username = args.username
|
||||
else:
|
||||
print_if_interactive(f"Username {Fore.BLUE}(email, phone number or user ID){Style.RESET_ALL}:")
|
||||
self._username = input()
|
||||
if args.password:
|
||||
self._password = args.password
|
||||
else:
|
||||
print_if_interactive(f"Password {Fore.BLUE}(not displayed for privacy reasons){Style.RESET_ALL}:")
|
||||
self._password = getpass("")
|
||||
|
||||
print_if_interactive()
|
||||
print_if_interactive(f"{Fore.BLUE}Logging in...")
|
||||
print_if_interactive()
|
||||
|
||||
self._session.cookies.set("sdkVersion", "accountsdk-18.8.15", domain="mi.com")
|
||||
self._session.cookies.set("sdkVersion", "accountsdk-18.8.15", domain="xiaomi.com")
|
||||
self._session.cookies.set("deviceId", self._device_id, domain="mi.com")
|
||||
self._session.cookies.set("deviceId", self._device_id, domain="xiaomi.com")
|
||||
|
||||
if not self.login_step_1():
|
||||
print_if_interactive(f"{Fore.RED}Invalid username.")
|
||||
return False
|
||||
|
||||
if not self.login_step_2():
|
||||
print_if_interactive(f"{Fore.RED}Invalid login or password.")
|
||||
return False
|
||||
|
||||
if self._location and not self._serviceToken and not self.login_step_3():
|
||||
print_if_interactive(f"{Fore.RED}Unable to get service token.")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def login_step_1(self):
|
||||
_LOGGER.debug("login_step_1")
|
||||
url = "https://account.xiaomi.com/pass/serviceLogin?sid=xiaomiio&_json=true"
|
||||
headers = {
|
||||
"User-Agent": self._agent,
|
||||
"Content-Type": "application/x-www-form-urlencoded"
|
||||
}
|
||||
cookies = {
|
||||
"userId": self._username
|
||||
}
|
||||
response = self._session.get(url, headers=headers, cookies=cookies)
|
||||
_LOGGER.debug(response.text)
|
||||
json_resp = self.to_json(response.text)
|
||||
if response.status_code == 200:
|
||||
if "_sign" in json_resp:
|
||||
self._sign = json_resp["_sign"]
|
||||
return True
|
||||
elif "ssecurity" in json_resp:
|
||||
self._ssecurity = json_resp["ssecurity"]
|
||||
self.userId = json_resp["userId"]
|
||||
self._cUserId = json_resp["cUserId"]
|
||||
self._passToken = json_resp["passToken"]
|
||||
self._location = json_resp["location"]
|
||||
self._code = json_resp["code"]
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def login_step_2(self) -> bool:
|
||||
_LOGGER.debug("login_step_2")
|
||||
url: str = "https://account.xiaomi.com/pass/serviceLoginAuth2"
|
||||
headers: dict = {
|
||||
"User-Agent": self._agent,
|
||||
"Content-Type": "application/x-www-form-urlencoded"
|
||||
}
|
||||
fields: dict = {
|
||||
"sid": "xiaomiio",
|
||||
"hash": hashlib.md5(str.encode(self._password)).hexdigest().upper(),
|
||||
"callback": "https://sts.api.io.mi.com/sts",
|
||||
"qs": "%3Fsid%3Dxiaomiio%26_json%3Dtrue",
|
||||
"user": self._username,
|
||||
"_sign": self._sign,
|
||||
"_json": "true"
|
||||
}
|
||||
_LOGGER.debug("login_step_2: URL: %s", url)
|
||||
_LOGGER.debug("login_step_2: Fields: %s", fields)
|
||||
|
||||
response = self._session.post(url, headers=headers, params=fields, allow_redirects=False)
|
||||
_LOGGER.debug("login_step_2: Response text: %s", response.text)
|
||||
|
||||
valid: bool = response is not None and response.status_code == 200
|
||||
|
||||
if valid:
|
||||
json_resp: dict = self.to_json(response.text)
|
||||
if "captchaUrl" in json_resp and json_resp["captchaUrl"] is not None:
|
||||
if args.non_interactive:
|
||||
parser.error("Captcha solution required, rerun in interactive mode")
|
||||
captcha_code: str = self.handle_captcha(json_resp["captchaUrl"])
|
||||
if not captcha_code:
|
||||
_LOGGER.debug("Could not solve captcha.")
|
||||
return False
|
||||
# Add captcha code to the fields and retry
|
||||
fields["captCode"] = captcha_code
|
||||
_LOGGER.debug("Retrying login with captcha.")
|
||||
response = self._session.post(url, headers=headers, params=fields, allow_redirects=False)
|
||||
_LOGGER.debug("login_step_2: Retry Response text: %s", response.text[:1000])
|
||||
if response is not None and response.status_code == 200:
|
||||
json_resp = self.to_json(response.text)
|
||||
else:
|
||||
_LOGGER.error("Login failed even after captcha.")
|
||||
return False
|
||||
if "code" in json_resp and json_resp["code"] == 87001:
|
||||
print_if_interactive("Invalid captcha.")
|
||||
return False
|
||||
|
||||
valid = "ssecurity" in json_resp and len(str(json_resp["ssecurity"])) > 4
|
||||
if valid:
|
||||
self._ssecurity = json_resp["ssecurity"]
|
||||
self.userId = json_resp.get("userId", None)
|
||||
self._cUserId = json_resp.get("cUserId", None)
|
||||
self._passToken = json_resp.get("passToken", None)
|
||||
self._location = json_resp.get("location", None)
|
||||
self._code = json_resp.get("code", None)
|
||||
else:
|
||||
if "notificationUrl" in json_resp:
|
||||
if args.non_interactive:
|
||||
parser.error("2FA solution required, rerun in interactive mode")
|
||||
verify_url = json_resp["notificationUrl"]
|
||||
return self.do_2fa_email_flow(verify_url)
|
||||
else:
|
||||
_LOGGER.error("login_step_2: Login failed, server returned: %s", json_resp)
|
||||
else:
|
||||
_LOGGER.error("login_step_2: HTTP status: %s; Response: %s", response.status_code, response.text[:500])
|
||||
return valid
|
||||
|
||||
def login_step_3(self):
|
||||
_LOGGER.debug("login_step_3")
|
||||
headers = {
|
||||
"User-Agent": self._agent,
|
||||
"Content-Type": "application/x-www-form-urlencoded"
|
||||
}
|
||||
response = self._session.get(self._location, headers=headers)
|
||||
_LOGGER.debug(response.text)
|
||||
if response.status_code == 200:
|
||||
self._serviceToken = response.cookies.get("serviceToken")
|
||||
return response.status_code == 200
|
||||
|
||||
def handle_captcha(self, captcha_url: str) -> str:
|
||||
# Full URL in case it's relative
|
||||
if captcha_url.startswith("/"):
|
||||
captcha_url = "https://account.xiaomi.com" + captcha_url
|
||||
|
||||
_LOGGER.debug("Downloading captcha image from: %s", captcha_url)
|
||||
response = self._session.get(captcha_url, stream=False)
|
||||
if response.status_code != 200:
|
||||
_LOGGER.error("Unable to fetch captcha image.")
|
||||
return ""
|
||||
|
||||
print_if_interactive(f"{Fore.YELLOW}Captcha verification required.")
|
||||
present_image_image(
|
||||
response.content,
|
||||
message_url = f"Image URL: {Fore.BLUE}http://{args.host or '127.0.0.1'}:31415",
|
||||
message_file_saved = "Captcha image saved at: {}",
|
||||
message_manually_open_file = "Please open {} and solve the captcha."
|
||||
)
|
||||
|
||||
# Ask user for a captcha solution
|
||||
print_if_interactive(f"Enter captcha as shown in the image {Fore.BLUE}(case-sensitive){Style.RESET_ALL}:")
|
||||
captcha_solution: str = input().strip()
|
||||
print_if_interactive()
|
||||
return captcha_solution
|
||||
|
||||
|
||||
def do_2fa_email_flow(self, notification_url: str) -> bool:
|
||||
"""
|
||||
Handles the email-based 2FA flow and extracts ssecurity + serviceToken.
|
||||
Robust to cases where verifyEmail returns non-JSON/empty body.
|
||||
"""
|
||||
# 1) Open notificationUrl (authStart)
|
||||
headers = {
|
||||
"User-Agent": self._agent,
|
||||
"Content-Type": "application/x-www-form-urlencoded"
|
||||
}
|
||||
_LOGGER.debug("Opening notificationUrl (authStart): %s", notification_url)
|
||||
r = self._session.get(notification_url, headers=headers)
|
||||
_LOGGER.debug("authStart final URL: %s status=%s", r.url, r.status_code)
|
||||
|
||||
# 2) Fetch identity options (list)
|
||||
context = parse_qs(urlparse(notification_url).query)["context"][0]
|
||||
list_params = {
|
||||
"sid": "xiaomiio",
|
||||
"context": context,
|
||||
"_locale": "en_US"
|
||||
}
|
||||
_LOGGER.debug("GET /identity/list params=%s", list_params)
|
||||
r = self._session.get("https://account.xiaomi.com/identity/list", params=list_params, headers=headers)
|
||||
_LOGGER.debug("identity/list status=%s", r.status_code)
|
||||
|
||||
# 3) Request email ticket
|
||||
send_params = {
|
||||
"_dc": str(int(time.time() * 1000)),
|
||||
"sid": "xiaomiio",
|
||||
"context": list_params["context"],
|
||||
"mask": "0",
|
||||
"_locale": "en_US"
|
||||
}
|
||||
send_data = {
|
||||
"retry": "0",
|
||||
"icode": "",
|
||||
"_json": "true",
|
||||
"ick": self._session.cookies.get("ick", "")
|
||||
}
|
||||
_LOGGER.debug("sendEmailTicket POST url=https://account.xiaomi.com/identity/auth/sendEmailTicket params=%s", send_params)
|
||||
_LOGGER.debug("sendEmailTicket data=%s", send_data)
|
||||
r = self._session.post("https://account.xiaomi.com/identity/auth/sendEmailTicket",
|
||||
params=send_params, data=send_data, headers=headers)
|
||||
try:
|
||||
jr = r.json()
|
||||
except Exception:
|
||||
jr = {}
|
||||
_LOGGER.debug("sendEmailTicket response status=%s json=%s", r.status_code, jr)
|
||||
|
||||
# 4) Ask user for the email code and verify
|
||||
if args.non_interactive:
|
||||
parser.error("Email verification code required, rerun without --non_interactive")
|
||||
|
||||
print_if_interactive(f"{Fore.YELLOW}Two factor authentication required, please provide the code from the email.")
|
||||
print_if_interactive()
|
||||
print_if_interactive("2FA Code:")
|
||||
code = input().strip()
|
||||
print_if_interactive()
|
||||
|
||||
verify_params = {
|
||||
"_flag": "8",
|
||||
"_json": "true",
|
||||
"sid": "xiaomiio",
|
||||
"context": list_params["context"],
|
||||
"mask": "0",
|
||||
"_locale": "en_US"
|
||||
}
|
||||
verify_data = {
|
||||
"_flag": "8",
|
||||
"ticket": code,
|
||||
"trust": "false",
|
||||
"_json": "true",
|
||||
"ick": self._session.cookies.get("ick", "")
|
||||
}
|
||||
r = self._session.post("https://account.xiaomi.com/identity/auth/verifyEmail",
|
||||
params=verify_params, data=verify_data, headers=headers)
|
||||
if r.status_code != 200:
|
||||
_LOGGER.error("verifyEmail failed: status=%s body=%s", r.status_code, r.text[:500])
|
||||
return False
|
||||
|
||||
try:
|
||||
jr = r.json()
|
||||
_LOGGER.debug("verifyEmail response status=%s json=%s", r.status_code, jr)
|
||||
finish_loc = jr.get("location")
|
||||
except Exception:
|
||||
# Non-JSON or empty; try to extract from headers or body
|
||||
_LOGGER.debug("verifyEmail returned non-JSON, attempting fallback extraction.")
|
||||
finish_loc = r.headers.get("Location")
|
||||
if not finish_loc and r.text:
|
||||
m = re.search(r'https://account\.xiaomi\.com/identity/result/check\?[^"\']+', r.text)
|
||||
if m:
|
||||
finish_loc = m.group(0)
|
||||
|
||||
# Fallback: directly hit result/check using existing identity_session/context
|
||||
if not finish_loc:
|
||||
_LOGGER.debug("Using fallback call to /identity/result/check")
|
||||
r0 = self._session.get(
|
||||
"https://account.xiaomi.com/identity/result/check",
|
||||
params={"sid": "xiaomiio", "context": context, "_locale": "en_US"},
|
||||
headers=headers,
|
||||
allow_redirects=False
|
||||
)
|
||||
_LOGGER.debug("result/check (fallback) status=%s hop-> %s", r0.status_code, r0.headers.get("Location"))
|
||||
if r0.status_code in (301, 302) and r0.headers.get("Location"):
|
||||
finish_loc = r0.url if "serviceLoginAuth2/end" in r0.url else r0.headers["Location"]
|
||||
|
||||
if not finish_loc:
|
||||
_LOGGER.error("Unable to determine finish location after verifyEmail.")
|
||||
return False
|
||||
|
||||
# First hop: GET identity/result/check (do NOT follow redirects to inspect Location)
|
||||
if "identity/result/check" in finish_loc:
|
||||
r = self._session.get(finish_loc, headers=headers, allow_redirects=False)
|
||||
_LOGGER.debug("result/check status=%s hop-> %s", r.status_code, r.headers.get("Location"))
|
||||
end_url = r.headers.get("Location")
|
||||
else:
|
||||
end_url = finish_loc
|
||||
|
||||
if not end_url:
|
||||
_LOGGER.error("Could not find Auth2/end URL in finish chain.")
|
||||
return False
|
||||
|
||||
# 6) Call Auth2/end WITHOUT redirects to capture 'extension-pragma' header containing ssecurity
|
||||
r = self._session.get(end_url, headers=headers, allow_redirects=False)
|
||||
_LOGGER.debug("Auth2/end status=%s", r.status_code)
|
||||
_LOGGER.debug("Auth2/end body(trunc)=%s", r.text[:200])
|
||||
# Some servers return 200 first (HTML 'Tips' page), then 302 on next call.
|
||||
if r.status_code == 200 and "Xiaomi Account - Tips" in r.text:
|
||||
r = self._session.get(end_url, headers=headers, allow_redirects=False)
|
||||
_LOGGER.debug("Auth2/end(second) status=%s", r.status_code)
|
||||
|
||||
ext_prag = r.headers.get("extension-pragma")
|
||||
if ext_prag:
|
||||
try:
|
||||
ep_json = json.loads(ext_prag)
|
||||
ssec = ep_json.get("ssecurity")
|
||||
psec = ep_json.get("psecurity")
|
||||
_LOGGER.debug("extension-pragma present. ssecurity=%s psecurity=%s", ssec, psec)
|
||||
if ssec:
|
||||
self._ssecurity = ssec
|
||||
except Exception as e:
|
||||
_LOGGER.debug("Failed to parse extension-pragma: %s", e)
|
||||
|
||||
if not self._ssecurity:
|
||||
_LOGGER.error("extension-pragma header missing ssecurity; cannot continue.")
|
||||
return False
|
||||
|
||||
# 7) Find STS redirect and visit it (to set serviceToken cookie)
|
||||
sts_url = r.headers.get("Location")
|
||||
if not sts_url and r.text:
|
||||
idx = r.text.find("https://sts.api.io.mi.com/sts")
|
||||
if idx != -1:
|
||||
end = r.text.find('"', idx)
|
||||
if end == -1:
|
||||
end = idx + 300
|
||||
sts_url = r.text[idx:end]
|
||||
if not sts_url:
|
||||
_LOGGER.error("Auth2/end did not provide STS redirect.")
|
||||
return False
|
||||
|
||||
r = self._session.get(sts_url, headers=headers, allow_redirects=True)
|
||||
_LOGGER.debug("STS final URL: %s status=%s", r.url, r.status_code)
|
||||
if r.status_code != 200:
|
||||
_LOGGER.error("STS did not complete: status=%s body=%s", r.status_code, r.text[:200])
|
||||
return False
|
||||
|
||||
# Extract serviceToken from cookie jar
|
||||
self._serviceToken = self._session.cookies.get("serviceToken", domain=".sts.api.io.mi.com")
|
||||
found = bool(self._serviceToken)
|
||||
_LOGGER.debug("STS body (trunc)=%s", r.text[:20])
|
||||
if not found:
|
||||
_LOGGER.error("Could not parse serviceToken; cannot complete login.")
|
||||
return False
|
||||
_LOGGER.debug("STS did not return JSON; assuming 'ok' style response and relying on cookies.")
|
||||
_LOGGER.debug("extract_service_token: found=%s", found)
|
||||
|
||||
# Mirror serviceToken to API domains expected by Mi Cloud
|
||||
self.install_service_token_cookies(self._serviceToken)
|
||||
|
||||
# Update ids from cookies if available
|
||||
self.userId = self.userId or self._session.cookies.get("userId", domain=".xiaomi.com") or self._session.cookies.get("userId", domain=".sts.api.io.mi.com")
|
||||
self._cUserId = self._cUserId or self._session.cookies.get("cUserId", domain=".xiaomi.com") or self._session.cookies.get("cUserId", domain=".sts.api.io.mi.com")
|
||||
return True
|
||||
|
||||
def install_service_token_cookies(self, token: str):
|
||||
for d in [".api.io.mi.com", ".io.mi.com", ".mi.com"]:
|
||||
self._session.cookies.set("serviceToken", token, domain=d)
|
||||
self._session.cookies.set("yetAnotherServiceToken", token, domain=d)
|
||||
|
||||
|
||||
class QrCodeXiaomiCloudConnector(XiaomiCloudConnector):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._cUserId = None
|
||||
self._pass_token = None
|
||||
self._location = None
|
||||
self._qr_image_url = None
|
||||
self._login_url = None
|
||||
self._long_polling_url = None
|
||||
|
||||
def login(self) -> bool:
|
||||
|
||||
if not self.login_step_1():
|
||||
print_if_interactive(f"{Fore.RED}Unable to get login message.")
|
||||
return False
|
||||
|
||||
if not self.login_step_2():
|
||||
print_if_interactive(f"{Fore.RED}Unable to get login QR Image.")
|
||||
return False
|
||||
|
||||
if not self.login_step_3():
|
||||
print_if_interactive(f"{Fore.RED}Unable to login.")
|
||||
return False
|
||||
|
||||
if not self.login_step_4():
|
||||
print_if_interactive(f"{Fore.RED}Unable to get service token.")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def login_step_1(self) -> bool:
|
||||
_LOGGER.debug("login_step_1")
|
||||
url = "https://account.xiaomi.com/longPolling/loginUrl"
|
||||
data = {
|
||||
"_qrsize": "480",
|
||||
"qs": "%3Fsid%3Dxiaomiio%26_json%3Dtrue",
|
||||
"callback": "https://sts.api.io.mi.com/sts",
|
||||
"_hasLogo": "false",
|
||||
"sid": "xiaomiio",
|
||||
"serviceParam": "",
|
||||
"_locale": "en_GB",
|
||||
"_dc": str(int(time.time() * 1000))
|
||||
}
|
||||
|
||||
response = self._session.get(url, params=data)
|
||||
_LOGGER.debug(response.text)
|
||||
|
||||
if response.status_code == 200:
|
||||
response_data = self.to_json(response.text)
|
||||
if "qr" in response_data:
|
||||
self._qr_image_url = response_data["qr"]
|
||||
self._login_url = response_data["loginUrl"]
|
||||
self._long_polling_url = response_data["lp"]
|
||||
self._timeout = response_data["timeout"]
|
||||
return True
|
||||
return False
|
||||
|
||||
def login_step_2(self) -> bool:
|
||||
_LOGGER.debug("login_step_2")
|
||||
url = self._qr_image_url
|
||||
_LOGGER.debug("login_step_2: Image URL: %s", url)
|
||||
|
||||
response = self._session.get(url)
|
||||
|
||||
valid: bool = response is not None and response.status_code == 200
|
||||
|
||||
if valid:
|
||||
print_if_interactive(f"{Fore.BLUE}Please scan the following QR code to log in.")
|
||||
|
||||
present_image_image(
|
||||
response.content,
|
||||
message_url = f"QR code URL: {Fore.BLUE}http://{args.host or '127.0.0.1'}:31415",
|
||||
message_file_saved = "QR code image saved at: {}",
|
||||
message_manually_open_file = "Please open {} and scan the QR code."
|
||||
)
|
||||
print_if_interactive()
|
||||
print_if_interactive(f"{Fore.BLUE}Alternatively you can visit the following URL:")
|
||||
print_if_interactive(f"{Fore.BLUE} {self._login_url}")
|
||||
print_if_interactive()
|
||||
return True
|
||||
else:
|
||||
_LOGGER.error("login_step_2: HTTP status: %s; Response: %s", response.status_code, response.text[:500])
|
||||
return False
|
||||
|
||||
def login_step_3(self) -> bool:
|
||||
_LOGGER.debug("login_step_3")
|
||||
|
||||
url = self._long_polling_url
|
||||
_LOGGER.debug("Long polling URL: " + url)
|
||||
|
||||
start_time = time.time()
|
||||
# Start long polling
|
||||
while True:
|
||||
try:
|
||||
response = self._session.get(url, timeout=10)
|
||||
except requests.exceptions.Timeout:
|
||||
_LOGGER.debug("Long polling timed out, retrying...")
|
||||
if time.time() - start_time > self._timeout:
|
||||
_LOGGER.debug("Long polling timed out after {} seconds.".format(self._timeout))
|
||||
break
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
_LOGGER.error(f"An error occurred: {e}")
|
||||
break
|
||||
|
||||
if response.status_code == 200:
|
||||
break
|
||||
else:
|
||||
_LOGGER.error("Long polling failed, retrying...")
|
||||
|
||||
if response.status_code != 200:
|
||||
_LOGGER.error("Long polling failed with status code: " + str(response.status_code))
|
||||
return False
|
||||
|
||||
_LOGGER.debug("Login successful!")
|
||||
_LOGGER.debug("Response data:")
|
||||
|
||||
response_data = self.to_json(response.text)
|
||||
_LOGGER.debug(response_data)
|
||||
|
||||
self.userId = response_data["userId"]
|
||||
self._ssecurity = response_data["ssecurity"]
|
||||
self._cUserId = response_data["cUserId"]
|
||||
self._pass_token = response_data["passToken"]
|
||||
self._location = response_data["location"]
|
||||
|
||||
_LOGGER.debug("User ID: " + str(self.userId))
|
||||
_LOGGER.debug("Ssecurity: " + str(self._ssecurity))
|
||||
_LOGGER.debug("CUser ID: " + str(self._cUserId))
|
||||
_LOGGER.debug("Pass token: " + str(self._pass_token))
|
||||
_LOGGER.debug("Pass token: " + str(self._location))
|
||||
|
||||
return True
|
||||
|
||||
def login_step_4(self) -> bool:
|
||||
_LOGGER.debug("login_step_4")
|
||||
_LOGGER.debug("Fetching service token...")
|
||||
|
||||
if not (location := self._location):
|
||||
_LOGGER.error("No location found.")
|
||||
return False
|
||||
|
||||
response = self._session.get(location, headers={"content-type": "application/x-www-form-urlencoded"})
|
||||
if response.status_code != 200:
|
||||
return False
|
||||
|
||||
self._serviceToken = response.cookies["serviceToken"]
|
||||
_LOGGER.debug("Service token: " + str(self._serviceToken))
|
||||
return True
|
||||
|
||||
|
||||
def print_if_interactive(value: str="") -> None:
|
||||
if not args.non_interactive:
|
||||
print(value)
|
||||
|
||||
def print_tabbed(value: str, tab: int) -> None:
|
||||
print_if_interactive(" " * tab + value)
|
||||
|
||||
|
||||
def print_entry(key: str, value: str, tab: int) -> None:
|
||||
if value:
|
||||
print_tabbed(f'{Fore.YELLOW}{key + ":": <10}{Style.RESET_ALL}{value}', tab)
|
||||
|
||||
|
||||
def print_banner() -> None:
|
||||
print_if_interactive(Fore.YELLOW + Style.BRIGHT + r"""
|
||||
Xiaomi Cloud
|
||||
___ ____ _ _ ____ _ _ ____ ____ _ _ ___ ____ ____ ____ ___ ____ ____
|
||||
| | | |_/ |___ |\ | [__ |___ \/ | |__/ |__| | | | | |__/
|
||||
| |__| | \_ |___ | \| ___] |___ _/\_ | | \ | | |___ | |__| | \
|
||||
""" + Style.NORMAL +
|
||||
""" by Piotr Machowski
|
||||
|
||||
""")
|
||||
|
||||
|
||||
def start_image_server(image: bytes) -> None:
|
||||
class ImgHttpHandler(BaseHTTPRequestHandler):
|
||||
|
||||
def do_GET(self) -> None:
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
self.wfile.write(image)
|
||||
|
||||
def log_message(self, msg, *args) -> None:
|
||||
_LOGGER.debug(msg, *args)
|
||||
|
||||
httpd = HTTPServer(("", 31415), ImgHttpHandler)
|
||||
_LOGGER.info("server address: %s", httpd.server_address)
|
||||
_LOGGER.info("hostname: %s", socket.gethostname())
|
||||
|
||||
thread = threading.Thread(target = httpd.serve_forever)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
|
||||
def present_image_image(
|
||||
image_content: bytes,
|
||||
message_url: str,
|
||||
message_file_saved: str,
|
||||
message_manually_open_file: str,
|
||||
) -> None:
|
||||
try:
|
||||
# Try to serve an image file
|
||||
start_image_server(image_content)
|
||||
print_if_interactive(message_url)
|
||||
except Exception as e1:
|
||||
_LOGGER.debug(e1)
|
||||
# Save image to a temporary file
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
|
||||
tmp.write(image_content)
|
||||
tmp_path: str = tmp.name
|
||||
print_if_interactive(message_file_saved.format(tmp_path))
|
||||
try:
|
||||
img = Image.open(tmp_path)
|
||||
img.show()
|
||||
except Exception as e2:
|
||||
_LOGGER.debug(e2)
|
||||
print_if_interactive(message_manually_open_file.format(tmp_path))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
print_banner()
|
||||
if args.non_interactive:
|
||||
connector = PasswordXiaomiCloudConnector()
|
||||
else:
|
||||
print_if_interactive("Please select a way to log in:")
|
||||
print_if_interactive(f" p{Fore.BLUE} - using password")
|
||||
print_if_interactive(f" q{Fore.BLUE} - using QR code")
|
||||
log_in_method = ""
|
||||
while not log_in_method in ["P", "Q"]:
|
||||
log_in_method = input("p/q: ").upper()
|
||||
if log_in_method == "P":
|
||||
connector = PasswordXiaomiCloudConnector()
|
||||
else:
|
||||
connector = QrCodeXiaomiCloudConnector()
|
||||
print_if_interactive()
|
||||
|
||||
logged = connector.login()
|
||||
if logged:
|
||||
print_if_interactive(f"{Fore.GREEN}Logged in.")
|
||||
print_if_interactive()
|
||||
servers_to_check = get_servers_to_check()
|
||||
print_if_interactive()
|
||||
output = []
|
||||
for current_server in servers_to_check:
|
||||
all_homes = []
|
||||
homes = connector.get_homes(current_server)
|
||||
if homes is not None:
|
||||
for h in homes["result"]["homelist"]:
|
||||
all_homes.append({"home_id": h["id"], "home_owner": connector.userId})
|
||||
dev_cnt = connector.get_dev_cnt(current_server)
|
||||
if dev_cnt is not None:
|
||||
for h in dev_cnt["result"]["share"]["share_family"]:
|
||||
all_homes.append({"home_id": h["home_id"], "home_owner": h["home_owner"]})
|
||||
|
||||
if len(all_homes) == 0:
|
||||
print_if_interactive(f'{Fore.RED}No homes found for server "{current_server}".')
|
||||
|
||||
for home in all_homes:
|
||||
devices = connector.get_devices(current_server, home["home_id"], home["home_owner"])
|
||||
home["devices"] = []
|
||||
if devices is not None:
|
||||
if devices["result"]["device_info"] is None or len(devices["result"]["device_info"]) == 0:
|
||||
print_if_interactive(f'{Fore.RED}No devices found for server "{current_server}" @ home "{home["home_id"]}".')
|
||||
continue
|
||||
print_if_interactive(f'Devices found for server "{current_server}" @ home "{home["home_id"]}":')
|
||||
for device in devices["result"]["device_info"]:
|
||||
device_data = {**device}
|
||||
print_tabbed(f"{Fore.BLUE}---------", 3)
|
||||
if "name" in device:
|
||||
print_entry("NAME", device["name"], 3)
|
||||
if "did" in device:
|
||||
print_entry("ID", device["did"], 3)
|
||||
if "blt" in device["did"]:
|
||||
beaconkey = connector.get_beaconkey(current_server, device["did"])
|
||||
if beaconkey and "result" in beaconkey and "beaconkey" in beaconkey["result"]:
|
||||
print_entry("BLE KEY", beaconkey["result"]["beaconkey"], 3)
|
||||
device_data["BLE_DATA"] = beaconkey["result"]
|
||||
if "mac" in device:
|
||||
print_entry("MAC", device["mac"], 3)
|
||||
if "localip" in device:
|
||||
print_entry("IP", device["localip"], 3)
|
||||
if "token" in device:
|
||||
print_entry("TOKEN", device["token"], 3)
|
||||
if "model" in device:
|
||||
print_entry("MODEL", device["model"], 3)
|
||||
home["devices"].append(device_data)
|
||||
print_tabbed(f"{Fore.BLUE}---------", 3)
|
||||
print_if_interactive()
|
||||
else:
|
||||
print_if_interactive(f"{Fore.RED}Unable to get devices from server {current_server}.")
|
||||
output.append({"server": current_server, "homes": all_homes})
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(json.dumps(output, indent=4))
|
||||
else:
|
||||
print_if_interactive(f"{Fore.RED}Unable to log in.")
|
||||
|
||||
if not args.non_interactive:
|
||||
print_if_interactive()
|
||||
print_if_interactive("Press ENTER to finish")
|
||||
input()
|
||||
|
||||
|
||||
def get_servers_to_check() -> list[str]:
|
||||
servers_str = ", ".join(SERVERS)
|
||||
if args.server is not None:
|
||||
server = args.server
|
||||
elif args.non_interactive:
|
||||
server = ""
|
||||
else:
|
||||
print_if_interactive(
|
||||
f"Select server {Fore.BLUE}(one of: {servers_str}; Leave empty to check all available){Style.RESET_ALL}:")
|
||||
server = input()
|
||||
while server not in ["", *SERVERS]:
|
||||
print_if_interactive(f"{Fore.RED}Invalid server provided. Valid values: {servers_str}")
|
||||
print_if_interactive("Server:")
|
||||
server = input()
|
||||
|
||||
print_if_interactive()
|
||||
if not server == "":
|
||||
servers_to_check = [server]
|
||||
else:
|
||||
servers_to_check = [*SERVERS]
|
||||
return servers_to_check
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user