# !/opt/bin/python3 # -*- coding: utf-8 -*- import http.server import socketserver import os import subprocess import urllib.parse import urllib.request import urllib.error import re import time import shutil import glob import json from datetime import datetime # --- НАСТРОЙКИ --- PORT = 8888 CONFIG_DIR = "/opt/etc/mihomo" CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml") PROFILES_DIR = os.path.join(CONFIG_DIR, "profiles") BACKUP_DIR = os.path.join(CONFIG_DIR, "backup") LOG_FILE = "/tmp/mihomo_last_restart.log" RESTART_CMD = "xkeen -restart > " + LOG_FILE + " 2>&1" # --- ИНИЦИАЛИЗАЦИЯ --- if not os.path.exists(BACKUP_DIR): os.makedirs(BACKUP_DIR) if not os.path.exists(PROFILES_DIR): os.makedirs(PROFILES_DIR) if os.path.exists(CONFIG_PATH) and not os.path.islink(CONFIG_PATH): shutil.move(CONFIG_PATH, os.path.join(PROFILES_DIR, "default.yaml")) os.symlink(os.path.join(PROFILES_DIR, "default.yaml"), CONFIG_PATH) elif not os.path.exists(CONFIG_PATH): def_prof = os.path.join(PROFILES_DIR, "default.yaml") with open(def_prof, 'w') as f: f.write("proxies: []\n") os.symlink(def_prof, CONFIG_PATH) # --- ПАРСЕРЫ --- def parse_vless(link): try: if not link.startswith("vless://"): return None, "Link error" main = link[8:] name = "VLESS" if '#' in main: main, n = main.split('#', 1); name = urllib.parse.unquote(n).strip() name = re.sub(r'[\[\]\{\}\"\']', '', name) user_srv = main.split('?')[0] params = urllib.parse.parse_qs(main.split('?')[1]) if '?' in main else {} if '@' in user_srv: uuid, srv_port = user_srv.split('@', 1) else: return None, "No UUID" if ':' in srv_port: if ']' in srv_port: srv, port = srv_port.rsplit(':', 1); srv = srv.replace('[', '').replace(']', '') else: srv, port = srv_port.split(':') else: return None, "No Port" def get(k): return params.get(k, [''])[0] y = ['- name: "' + name + '"', ' type: vless', ' server: ' + srv, ' port: ' + port, ' uuid: ' + uuid, ' udp: true'] y.append(' network: ' + (get('type') or 'tcp')) if get('flow'): y.append(' flow: ' + get('flow')) if get('security'): y.append(' tls: true') if get('security') == 'reality': y.extend([' servername: ' + get('sni'), ' client-fingerprint: ' + (get('fp') or 'chrome'), ' reality-opts:', ' public-key: ' + get('pbk')]) if get('sid'): y.append(' short-id: ' + get('sid')) else: if get('sni'): y.append(' servername: ' + get('sni')) if get('fp'): y.append(' client-fingerprint: ' + get('fp')) if get('alpn'): av = get("alpn").replace(",", '", "') y.append(' alpn: ["' + av + '"]') if get('type') == 'ws': y.append(' ws-opts:') if get('path'): y.append(' path: ' + get('path')) if get('host'): y.extend([' headers:', ' Host: ' + get('host')]) elif get('type') == 'grpc' and get('serviceName'): y.extend([' grpc-opts:', ' grpc-service-name: ' + get('serviceName')]) return {"yaml": "\n".join(y), "name": name}, None except Exception as e: return None, str(e) def insert_proxy_logic(content, proxy_name, target_groups): lines = content.splitlines() new_lines = [] def get_indent(s): return len(s) - len(s.lstrip()) in_group_section = False current_group_name = None in_proxies_list = False proxies_list_indent = -1 inserted_in_group = set() for i, line in enumerate(lines): stripped = line.strip() indent = get_indent(line) is_new_group = stripped.startswith('- name:') if is_new_group: if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False if stripped.startswith('proxy-groups:'): in_group_section = True elif in_group_section and indent == 0 and stripped and not stripped.startswith('#'): in_group_section = False in_proxies_list = False current_group_name = None if in_group_section: if is_new_group: raw_name = stripped.split(':', 1)[1].strip() current_group_name = raw_name.strip("'").strip('"') if current_group_name in target_groups and stripped.startswith('proxies:'): in_proxies_list = True proxies_list_indent = indent new_lines.append(line) continue if in_proxies_list: if not stripped or stripped.startswith('#'): new_lines.append(line) continue if ('DIRECT' in stripped or 'REJECT' in stripped) and current_group_name not in inserted_in_group: prefix = " " * indent new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) if indent <= proxies_list_indent: if current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False new_lines.append(line) if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') return "\n".join(new_lines) HTML_TEMPLATE = """ Mihomo Editor v18.4
✅ Успешно сохранено

Mihomo Studio

v18.4 Auto-Panel
Loaded: __TIME__

Профили

Быстрый VLESS

Бэкапы

Оставить:
__BACKUPS__

Добавить в группы:

Удалить прокси

Консоль

...

Новый профиль

""" class H(http.server.SimpleHTTPRequestHandler): def end_headers(s): s.send_header('Cache-Control', 'no-store, no-cache, must-revalidate'); s.send_header('Pragma', 'no-cache'); s.send_header( 'Expires', '0'); super().end_headers() def get_bks(s): b = "" for f in sorted(glob.glob(BACKUP_DIR + "/*.yaml"), key=os.path.getmtime, reverse=True)[:10]: n = os.path.basename(f); t = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%d.%m %H:%M") b += f'''
{n}{t}
''' if not b: b = '
Нет бэкапов
' return b def get_prof_opts(s): curr = "" if os.path.exists(CONFIG_PATH): real = os.path.realpath(CONFIG_PATH) curr = os.path.splitext(os.path.basename(real))[0] opts = "" files = sorted(glob.glob(PROFILES_DIR + "/*.yaml")) for f in files: n = os.path.splitext(os.path.basename(f))[0] sel = "selected" if n == curr else "" opts += f'' return opts def get_panel_port(self): panel_port = '' try: with open(CONFIG_PATH, 'r') as f: config_content = f.read() # Улучшенный regex для поиска порта (учитывает кавычки и IP) # Ищет external-controller: "0.0.0.0:9090" или '127.0.0.1:9090' или просто :9090 match = re.search(r"external-controller:\s*(?:['\"]?)(?:[^:]*):(\d+)(?:['\"]?)", config_content) if match: panel_port = match.group(1) except (IOError, FileNotFoundError): pass return panel_port # --- PROXY LOGIC --- def proxy_pass(self, method): panel_port = self.get_panel_port() if not panel_port: self.send_error(500, "Panel port not found in config") return # Strip prefix rel_path = self.path.replace('/mihomo_panel/', '', 1) target_url = f"http://127.0.0.1:{panel_port}/{rel_path}" # Read Body content_len = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_len) if content_len > 0 else None # Create Request try: req = urllib.request.Request(target_url, data=body, method=method) for k, v in self.headers.items(): if k.lower() not in ['host', 'origin', 'referer']: req.add_header(k, v) # Важно: подменяем Host для корректной работы backend req.add_header('Host', f'127.0.0.1:{panel_port}') with urllib.request.urlopen(req) as resp: self.send_response(resp.status) for k, v in resp.getheaders(): # Фильтруем CORS заголовки от backend, т.к. мы их сами выставим если надо, # но здесь мы действуем как same-origin if k.lower() not in ['access-control-allow-origin', 'server', 'date']: self.send_header(k, v) self.end_headers() self.wfile.write(resp.read()) except urllib.error.HTTPError as e: self.send_response(e.code) for k, v in e.headers.items(): self.send_header(k, v) self.end_headers() self.wfile.write(e.read()) except Exception as e: # self.send_error(500, str(e)) pass # Silent fail to avoid crashing def do_GET(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('GET') return if s.path != '/': return s.send_error(404) c = open(CONFIG_PATH).read() if os.path.exists(CONFIG_PATH) else "proxies:\n" s.send_response(200); s.send_header('Content-type', 'text/html;charset=utf-8'); s.end_headers() out = HTML_TEMPLATE.replace('__JSON_CONTENT__', json.dumps(c)) \ .replace('__BACKUPS__', s.get_bks()) \ .replace('__PROFILES__', s.get_prof_opts()) \ .replace('__TIME__', datetime.now().strftime("%H:%M:%S")) s.wfile.write(out.encode('utf-8')) def do_POST(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('POST') return l = int(s.headers['Content-Length']); d = s.rfile.read(l).decode('utf-8', 'ignore') p = {k: v[0] for k, v in urllib.parse.parse_qs(d).items()}; a = p.get('act') s.send_response(200); s.send_header('Content-Type', 'application/json'); s.end_headers() # --- PROFILE ACTIONS --- if a == 'switch_prof': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): if os.path.exists(CONFIG_PATH) or os.path.islink(CONFIG_PATH): os.unlink(CONFIG_PATH) os.symlink(target, CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'Profile not found'}).encode('utf-8')) return if a == 'add_prof': n = p.get('name') c = p.get('content', '') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): s.wfile.write(json.dumps({'error': 'Профиль с таким именем уже существует'}).encode('utf-8')) else: with open(target, 'w') as f: f.write(c) if not os.path.exists(CONFIG_PATH): os.symlink(target, CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) return if a == 'del_prof': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") real_curr = os.path.realpath(CONFIG_PATH) if os.path.realpath(target) == real_curr: s.wfile.write( json.dumps({'error': 'Нельзя удалить активный профиль. Сначала переключитесь на другой.'}).encode( 'utf-8')) elif os.path.exists(target): os.remove(target) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'File not found'}).encode('utf-8')) return # --- EXISTING ACTIONS --- if a == 'parse': d, e = parse_vless(p.get('link', '')) s.wfile.write(json.dumps(d if d else {'error': e}).encode('utf-8')); return if a == 'apply_insert': content = p.get('content', ''); p_name = p.get('proxy_name', ''); p_yaml = p.get('proxy_yaml', ''); targets = json.loads(p.get('targets', '[]')) lines = content.splitlines(); inserted = False for i, line in enumerate(lines): if line.strip().startswith('proxies:'): blk = p_yaml.splitlines(); for bi, bl in enumerate(blk): lines.insert(i + 1 + bi, " " + bl) inserted = True; break if not inserted: lines.append("proxies:"); lines.extend([" " + l for l in p_yaml.splitlines()]) uc = insert_proxy_logic("\n".join(lines), p_name, targets) s.wfile.write(json.dumps({'new_content': uc}).encode('utf-8')); return if a == 'clean_backups': limit = int(p.get('limit', 5)) files = sorted(glob.glob(BACKUP_DIR + "/*.yaml"), key=os.path.getmtime, reverse=True) if len(files) > limit: for f in files[limit:]: try: os.remove(f) except: pass s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'del_backup': fname = p.get('f') path = os.path.join(BACKUP_DIR, os.path.basename(fname)) if os.path.exists(path): os.remove(path) s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'rest': shutil.copy(os.path.join(BACKUP_DIR, os.path.basename(p.get('f'))), CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')); return new_c = p.get('content', '').replace('\r\n', '\n') if a in ['save', 'restart']: if os.path.exists(CONFIG_PATH): real_p = os.path.basename(os.path.realpath(CONFIG_PATH)) prof_n = os.path.splitext(real_p)[0] ts = datetime.now().strftime('%Y%m%d_%H%M%S') shutil.copy(CONFIG_PATH, f"{BACKUP_DIR}/{prof_n}_{ts}.yaml") with open(CONFIG_PATH, 'w') as f: f.write(new_c); f.flush(); os.fsync(f.fileno()) if a == 'restart': my_env = os.environ.copy(); my_env["TERM"] = "xterm-256color" subprocess.run(RESTART_CMD, shell=True, env=my_env) log = open(LOG_FILE).read() if os.path.exists(LOG_FILE) else "Log empty" s.wfile.write(json.dumps({'log': log}).encode('utf-8')) elif a == 'save': s.wfile.write(json.dumps( {'status': 'ok', 'time': datetime.now().strftime("%H:%M:%S"), 'backups': s.get_bks()}).encode('utf-8')) def do_PUT(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('PUT') return s.send_error(405, "Method Not Allowed") def do_DELETE(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('DELETE') return s.send_error(405, "Method Not Allowed") try: socketserver.TCPServer.allow_reuse_address = True; socketserver.TCPServer(("", PORT), H).serve_forever() except Exception as e: print(e)