# !/opt/bin/python3 # -*- coding: utf-8 -*- import http.server import socketserver import os import subprocess import urllib.parse import urllib.request import urllib.error import re import time import shutil import glob import json from datetime import datetime # --- НАСТРОЙКИ --- PORT = 8888 CONFIG_DIR = "/opt/etc/mihomo" CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml") PROFILES_DIR = os.path.join(CONFIG_DIR, "profiles") BACKUP_DIR = os.path.join(CONFIG_DIR, "backup") LOG_FILE = "/tmp/mihomo_last_restart.log" RESTART_CMD = "xkeen -restart > " + LOG_FILE + " 2>&1" # --- ИНИЦИАЛИЗАЦИЯ --- if not os.path.exists(BACKUP_DIR): os.makedirs(BACKUP_DIR) if not os.path.exists(PROFILES_DIR): os.makedirs(PROFILES_DIR) if os.path.exists(CONFIG_PATH) and not os.path.islink(CONFIG_PATH): shutil.move(CONFIG_PATH, os.path.join(PROFILES_DIR, "default.yaml")) os.symlink(os.path.join(PROFILES_DIR, "default.yaml"), CONFIG_PATH) elif not os.path.exists(CONFIG_PATH): def_prof = os.path.join(PROFILES_DIR, "default.yaml") with open(def_prof, 'w') as f: f.write("proxies: []\n") os.symlink(def_prof, CONFIG_PATH) # --- ПАРСЕРЫ --- def parse_vless(link): try: if not link.startswith("vless://"): return None, "Link error" main = link[8:] name = "VLESS" if '#' in main: main, n = main.split('#', 1); name = urllib.parse.unquote(n).strip() name = re.sub(r'[\[\]\{\}\"\']', '', name) user_srv = main.split('?')[0] params = urllib.parse.parse_qs(main.split('?')[1]) if '?' in main else {} if '@' in user_srv: uuid, srv_port = user_srv.split('@', 1) else: return None, "No UUID" if ':' in srv_port: if ']' in srv_port: srv, port = srv_port.rsplit(':', 1); srv = srv.replace('[', '').replace(']', '') else: srv, port = srv_port.split(':') else: return None, "No Port" def get(k): return params.get(k, [''])[0] y = ['- name: "' + name + '"', ' type: vless', ' server: ' + srv, ' port: ' + port, ' uuid: ' + uuid, ' udp: true'] y.append(' network: ' + (get('type') or 'tcp')) if get('flow'): y.append(' flow: ' + get('flow')) if get('security'): y.append(' tls: true') if get('security') == 'reality': y.extend([' servername: ' + get('sni'), ' client-fingerprint: ' + (get('fp') or 'chrome'), ' reality-opts:', ' public-key: ' + get('pbk')]) if get('sid'): y.append(' short-id: ' + get('sid')) else: if get('sni'): y.append(' servername: ' + get('sni')) if get('fp'): y.append(' client-fingerprint: ' + get('fp')) if get('alpn'): av = get("alpn").replace(",", '", "') y.append(' alpn: ["' + av + '"]') if get('type') == 'ws': y.append(' ws-opts:') if get('path'): y.append(' path: ' + get('path')) if get('host'): y.extend([' headers:', ' Host: ' + get('host')]) elif get('type') == 'grpc' and get('serviceName'): y.extend([' grpc-opts:', ' grpc-service-name: ' + get('serviceName')]) return {"yaml": "\n".join(y), "name": name}, None except Exception as e: return None, str(e) def parse_wireguard(config_text): try: name = "WireGuard" params = {} current_section = None for line in config_text.splitlines(): line = line.strip() if not line or line.startswith('#'): continue if line.startswith('[') and line.endswith(']'): current_section = line[1:-1].lower() continue if '=' in line: key, value = map(str.strip, line.split('=', 1)) if current_section: if current_section not in params: params[current_section] = {} params[current_section][key] = value if 'interface' not in params or 'peer' not in params: return None, "Invalid WireGuard config: missing [Interface] or [Peer] section." interface = params.get('interface', {}) peer = params.get('peer', {}) # Extract name from comment if exists name_match = re.search(r'#\s*(.+)', config_text.splitlines()[0]) if name_match: name = name_match.group(1).strip() name = f"WG-{name}" server, port = peer.get('Endpoint', ':').rsplit(':', 1) y = [ f'- name: "{name}"', ' type: wireguard', f' server: {server}', f' port: {port}', f' private-key: "{interface.get("PrivateKey")}"', f' public-key: "{peer.get("PublicKey")}"', f' ip: "{interface.get("Address").split("/")[0]}"' ] if interface.get('DNS'): dns_servers = [d.strip() for d in interface.get('DNS').split(',')] y.append(f' dns: {dns_servers}') if peer.get('PresharedKey'): y.append(f' preshared-key: "{peer.get("PresharedKey")}"') if peer.get('PersistentKeepalive'): y.append(f' keep-alive: {peer.get("PersistentKeepalive")}') amnezia_opts = { 'Jc': interface.get('Jc'), 'Jmin': interface.get('Jmin'), 'Jmax': interface.get('Jmax'), 'S1': interface.get('S1'), 'S2': interface.get('S2'), 'H1': interface.get('H1'), 'H2': interface.get('H2'), 'H3': interface.get('H3'), 'H4': interface.get('H4') } if any(v is not None for v in amnezia_opts.values()): y.append(' amnezia-wg-opts:') for key, value in amnezia_opts.items(): if value is not None: y.append(f' {key.lower()}: {value}') return {"yaml": "\n".join(y), "name": name}, None except Exception as e: return None, str(e) def insert_proxy_logic(content, proxy_name, target_groups): lines = content.splitlines() new_lines = [] def get_indent(s): return len(s) - len(s.lstrip()) in_group_section = False current_group_name = None in_proxies_list = False proxies_list_indent = -1 inserted_in_group = set() for i, line in enumerate(lines): stripped = line.strip() indent = get_indent(line) is_new_group = stripped.startswith('- name:') if is_new_group: if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False if stripped.startswith('proxy-groups:'): in_group_section = True elif in_group_section and indent == 0 and stripped and not stripped.startswith('#'): in_group_section = False in_proxies_list = False current_group_name = None if in_group_section: if is_new_group: raw_name = stripped.split(':', 1)[1].strip() current_group_name = raw_name.strip("'").strip('"') if current_group_name in target_groups and stripped.startswith('proxies:'): in_proxies_list = True proxies_list_indent = indent new_lines.append(line) continue if in_proxies_list: if not stripped or stripped.startswith('#'): new_lines.append(line) continue if ('DIRECT' in stripped or 'REJECT' in stripped) and current_group_name not in inserted_in_group: prefix = " " * indent new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) if indent <= proxies_list_indent: if current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False new_lines.append(line) if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') return "\n".join(new_lines) HTML_TEMPLATE = """ Mihomo Editor v18.4
✅ Успешно сохранено

Mihomo Studio

v18.4 Auto-Panel
Loaded: __TIME__

Профили

Управление прокси

Бэкапы

Оставить:
__BACKUPS__

Добавить в группы:

Удалить прокси

Консоль

...

Переименовать прокси

Выберите прокси для переименования:

Новое имя:

Новый профиль

Добавить прокси

Просмотр бэкапа


    
""" class H(http.server.SimpleHTTPRequestHandler): def end_headers(s): s.send_header('Cache-Control', 'no-store, no-cache, must-revalidate'); s.send_header('Pragma', 'no-cache'); s.send_header( 'Expires', '0'); super().end_headers() def get_bks(s): b = "" for f in sorted(glob.glob(BACKUP_DIR + "/*.yaml"), key=os.path.getmtime, reverse=True)[:10]: n = os.path.basename(f); t = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%d.%m %H:%M") b += f'''
{n}{t}
''' if not b: b = '
Нет бэкапов
' return b def get_prof_opts(s): curr = "" if os.path.exists(CONFIG_PATH): real = os.path.realpath(CONFIG_PATH) curr = os.path.splitext(os.path.basename(real))[0] opts = "" files = sorted(glob.glob(PROFILES_DIR + "/*.yaml")) for f in files: n = os.path.splitext(os.path.basename(f))[0] sel = "selected" if n == curr else "" opts += f'' return opts def get_panel_port(self): panel_port = '' try: with open(CONFIG_PATH, 'r') as f: config_content = f.read() # Улучшенный regex для поиска порта (учитывает кавычки и IP) # Ищет external-controller: "0.0.0.0:9090" или '127.0.0.1:9090' или просто :9090 match = re.search(r"external-controller:\s*(?:['\"]?)(?:[^:]*):(\d+)(?:['\"]?)", config_content) if match: panel_port = match.group(1) except (IOError, FileNotFoundError): pass return panel_port # --- PROXY LOGIC --- def proxy_pass(self, method): panel_port = self.get_panel_port() if not panel_port: self.send_error(500, "Panel port not found in config") return # Strip prefix rel_path = self.path.replace('/mihomo_panel/', '', 1) target_url = f"http://127.0.0.1:{panel_port}/{rel_path}" # Read Body content_len = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_len) if content_len > 0 else None # Create Request try: req = urllib.request.Request(target_url, data=body, method=method) for k, v in self.headers.items(): if k.lower() not in ['host', 'origin', 'referer']: req.add_header(k, v) # Важно: подменяем Host для корректной работы backend req.add_header('Host', f'127.0.0.1:{panel_port}') with urllib.request.urlopen(req) as resp: self.send_response(resp.status) for k, v in resp.getheaders(): # Фильтруем CORS заголовки от backend, т.к. мы их сами выставим если надо, # но здесь мы действуем как same-origin if k.lower() not in ['access-control-allow-origin', 'server', 'date']: self.send_header(k, v) self.end_headers() self.wfile.write(resp.read()) except urllib.error.HTTPError as e: self.send_response(e.code) for k, v in e.headers.items(): self.send_header(k, v) self.end_headers() self.wfile.write(e.read()) except Exception as e: # self.send_error(500, str(e)) pass # Silent fail to avoid crashing def do_GET(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('GET') return if s.path != '/': return s.send_error(404) c = open(CONFIG_PATH).read() if os.path.exists(CONFIG_PATH) else "proxies:\n" s.send_response(200); s.send_header('Content-type', 'text/html;charset=utf-8'); s.end_headers() out = HTML_TEMPLATE.replace('__JSON_CONTENT__', json.dumps(c)) \ .replace('__BACKUPS__', s.get_bks()) \ .replace('__PROFILES__', s.get_prof_opts()) \ .replace('__TIME__', datetime.now().strftime("%H:%M:%S")) s.wfile.write(out.encode('utf-8')) def do_POST(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('POST') return l = int(s.headers['Content-Length']); d = s.rfile.read(l).decode('utf-8', 'ignore') p = {k: v[0] for k, v in urllib.parse.parse_qs(d).items()}; a = p.get('act') s.send_response(200); s.send_header('Content-Type', 'application/json'); s.end_headers() # --- PROFILE ACTIONS --- if a == 'switch_prof': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): if os.path.exists(CONFIG_PATH) or os.path.islink(CONFIG_PATH): os.unlink(CONFIG_PATH) os.symlink(target, CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'Profile not found'}).encode('utf-8')) return if a == 'add_prof': n = p.get('name') c = p.get('content', '') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): s.wfile.write(json.dumps({'error': 'Профиль с таким именем уже существует'}).encode('utf-8')) else: with open(target, 'w') as f: f.write(c) if not os.path.exists(CONFIG_PATH): os.symlink(target, CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) return if a == 'del_prof': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") real_curr = os.path.realpath(CONFIG_PATH) if os.path.realpath(target) == real_curr: s.wfile.write( json.dumps({'error': 'Нельзя удалить активный профиль. Сначала переключитесь на другой.'}).encode( 'utf-8')) elif os.path.exists(target): os.remove(target) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'File not found'}).encode('utf-8')) return if a == 'get_prof_content': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): with open(target, 'r', encoding='utf-8') as f: content = f.read() s.wfile.write(json.dumps({'status': 'ok', 'content': content}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'Profile not found'}).encode('utf-8')) return if a == 'rename_proxy': old_name = p.get('old_name') new_name = p.get('new_name') content = p.get('content', '') if not all([old_name, new_name, content]): s.wfile.write(json.dumps({'error': 'Missing parameters'}).encode('utf-8')) return # 1. Замена в определении прокси: - name: "old_name" # Regex для поиска `name: 'old_name'`, `name: "old_name"` или `name: old_name` # Используем `re.escape` для безопасности escaped_old = re.escape(old_name) # (?P['"]?) - захватывает кавычку (если она есть) в группу 'quote' # \\1 - ссылается на захваченную кавычку, чтобы заменить на такую же pattern_def = r"(name\s*:\s*)(?P['\"]?)" + escaped_old + r"(?P=quote)" # Заменяем, сохраняя оригинальные кавычки content = re.sub(pattern_def, r'\g<1>"' + new_name + '"', content, count=1) # 2. Замена в списках proxy-groups: - "old_name" # Regex для поиска `- 'old_name'`, `- "old_name"` или `- old_name` pattern_list = r"(-\s+)(?P['\"]?)" + escaped_old + r"(?P=quote)" content = re.sub(pattern_list, r'\g<1>"' + new_name + '"', content) s.wfile.write(json.dumps({'status': 'ok', 'new_content': content}).encode('utf-8')) return # --- EXISTING ACTIONS --- if a == 'parse': d, e = parse_vless(p.get('link', '')) s.wfile.write(json.dumps(d if d else {'error': e}).encode('utf-8')); return if a == 'add_wireguard': config_text = p.get('config_text', '') if not config_text: s.wfile.write(json.dumps({'error': 'Empty config'}).encode('utf-8')) return proxy_data, err = parse_wireguard(config_text) if err: s.wfile.write(json.dumps({'error': err}).encode('utf-8')) return s.wfile.write(json.dumps(proxy_data).encode('utf-8')) return if a == 'apply_insert': content = p.get('content', ''); p_name = p.get('proxy_name', ''); p_yaml = p.get('proxy_yaml', ''); targets = json.loads(p.get('targets', '[]')) lines = content.splitlines(); inserted = False for i, line in enumerate(lines): if line.strip().startswith('proxies:'): blk = p_yaml.splitlines(); for bi, bl in enumerate(blk): lines.insert(i + 1 + bi, " " + bl) inserted = True; break if not inserted: lines.append("proxies:"); lines.extend([" " + l for l in p_yaml.splitlines()]) uc = insert_proxy_logic("\n".join(lines), p_name, targets) s.wfile.write(json.dumps({'new_content': uc}).encode('utf-8')); return if a == 'clean_backups': limit = int(p.get('limit', 5)) files = sorted(glob.glob(BACKUP_DIR + "/*.yaml"), key=os.path.getmtime, reverse=True) if len(files) > limit: for f in files[limit:]: try: os.remove(f) except: pass s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'del_backup': fname = p.get('f') path = os.path.join(BACKUP_DIR, os.path.basename(fname)) if os.path.exists(path): os.remove(path) s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'rest': shutil.copy(os.path.join(BACKUP_DIR, os.path.basename(p.get('f'))), CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')); return if a == 'view_backup': fname = p.get('f') path = os.path.join(BACKUP_DIR, os.path.basename(fname)) if os.path.exists(path): with open(path, 'r', encoding='utf-8') as f: content = f.read() s.wfile.write(json.dumps({'content': content}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'File not found'}).encode('utf-8')) return new_c = p.get('content', '').replace('\r\n', '\n') if a in ['save', 'restart']: if os.path.exists(CONFIG_PATH): real_p = os.path.basename(os.path.realpath(CONFIG_PATH)) prof_n = os.path.splitext(real_p)[0] ts = datetime.now().strftime('%Y%m%d_%H%M%S') shutil.copy(CONFIG_PATH, f"{BACKUP_DIR}/{prof_n}_{ts}.yaml") with open(CONFIG_PATH, 'w') as f: f.write(new_c); f.flush(); os.fsync(f.fileno()) if a == 'restart': my_env = os.environ.copy(); my_env["TERM"] = "xterm-256color" subprocess.run(RESTART_CMD, shell=True, env=my_env) log = open(LOG_FILE).read() if os.path.exists(LOG_FILE) else "Log empty" s.wfile.write(json.dumps({'log': log}).encode('utf-8')) elif a == 'save': s.wfile.write(json.dumps( {'status': 'ok', 'time': datetime.now().strftime("%H:%M:%S"), 'backups': s.get_bks()}).encode('utf-8')) def do_PUT(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('PUT') return s.send_error(405, "Method Not Allowed") def do_DELETE(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('DELETE') return s.send_error(405, "Method Not Allowed") try: socketserver.TCPServer.allow_reuse_address = True; socketserver.TCPServer(("", PORT), H).serve_forever() except Exception as e: print(e)