# !/opt/bin/python3 # -*- coding: utf-8 -*- import http.server import socketserver import os import subprocess import urllib.parse import urllib.request import urllib.error import re import time import shutil import glob import json import yaml from datetime import datetime # --- НАСТРОЙКИ --- PORT = 8888 CONFIG_DIR = "/opt/etc/mihomo" CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml") PROFILES_DIR = os.path.join(CONFIG_DIR, "profiles") BACKUP_DIR = os.path.join(CONFIG_DIR, "backup") LOG_FILE = "/tmp/mihomo_last_restart.log" RESTART_CMD = "xkeen -restart > " + LOG_FILE + " 2>&1" # --- ИНИЦИАЛИЗАЦИЯ --- if not os.path.exists(BACKUP_DIR): os.makedirs(BACKUP_DIR) if not os.path.exists(PROFILES_DIR): os.makedirs(PROFILES_DIR) if os.path.exists(CONFIG_PATH) and not os.path.islink(CONFIG_PATH): shutil.move(CONFIG_PATH, os.path.join(PROFILES_DIR, "default.yaml")) os.symlink(os.path.join(PROFILES_DIR, "default.yaml"), CONFIG_PATH) elif not os.path.exists(CONFIG_PATH): def_prof = os.path.join(PROFILES_DIR, "default.yaml") with open(def_prof, 'w') as f: f.write("proxies: []\n") os.symlink(def_prof, CONFIG_PATH) # --- ПАРСЕРЫ --- def parse_vless(link, custom_name=None): try: if not link.startswith("vless://"): return None, "Link error" main = link[8:] name = "VLESS" if custom_name: name = custom_name elif '#' in main: main, n = main.split('#', 1) name = urllib.parse.unquote(n).strip() name = re.sub(r'[\[\]\{\}\"\']', '', name) user_srv = main.split('?')[0] params = urllib.parse.parse_qs(main.split('?')[1]) if '?' in main else {} if '@' in user_srv: uuid, srv_port = user_srv.split('@', 1) else: return None, "No UUID" if ':' in srv_port: if ']' in srv_port: srv, port = srv_port.rsplit(':', 1); srv = srv.replace('[', '').replace(']', '') else: srv, port = srv_port.split(':') else: return None, "No Port" def get(k): return params.get(k, [''])[0] y = ['- name: "' + name + '"', ' type: vless', ' server: ' + srv, ' port: ' + port, ' uuid: ' + uuid, ' udp: true'] y.append(' network: ' + (get('type') or 'tcp')) if get('flow'): y.append(' flow: ' + get('flow')) if get('security'): y.append(' tls: true') if get('security') == 'reality': y.extend([' servername: ' + get('sni'), ' client-fingerprint: ' + (get('fp') or 'chrome'), ' reality-opts:', ' public-key: ' + get('pbk')]) if get('sid'): y.append(' short-id: ' + get('sid')) else: if get('sni'): y.append(' servername: ' + get('sni')) if get('fp'): y.append(' client-fingerprint: ' + get('fp')) if get('alpn'): av = get("alpn").replace(",", '", "') y.append(' alpn: ["' + av + '"]') if get('type') == 'ws': y.append(' ws-opts:') if get('path'): y.append(' path: ' + get('path')) if get('host'): y.extend([' headers:', ' Host: ' + get('host')]) elif get('type') == 'grpc' and get('serviceName'): y.extend([' grpc-opts:', ' grpc-service-name: ' + get('serviceName')]) return {"yaml": "\n".join(y), "name": name}, None except Exception as e: return None, str(e) def parse_wireguard(config_text, custom_name=None): try: name = "WireGuard" params = {} current_section = None for line in config_text.splitlines(): line = line.strip() if not line or line.startswith('#'): continue if line.startswith('[') and line.endswith(']'): current_section = line[1:-1].lower() continue if '=' in line: key, value = map(str.strip, line.split('=', 1)) if current_section: if current_section not in params: params[current_section] = {} params[current_section][key] = value if 'interface' not in params or 'peer' not in params: return None, "Invalid WireGuard config: missing [Interface] or [Peer] section." interface = params.get('interface', {}) peer = params.get('peer', {}) server, port_str = peer.get('Endpoint', ':').rsplit(':', 1) if custom_name: name = custom_name else: name_match = re.search(r'#\s*(.+)', config_text.splitlines()[0]) if name_match: name = name_match.group(1).strip() elif server: name = f"WG_{server}" else: name = "WireGuard" # Преобразуем порт в int, если возможно, иначе оставляем как есть try: port = int(port_str) except (ValueError, TypeError): port = port_str ip_address = interface.get("Address", "").split("/")[0] proxy_dict = { 'name': name, 'type': 'wireguard', 'server': server, 'port': port, 'private-key': interface.get("PrivateKey"), 'public-key': peer.get("PublicKey"), 'ip': ip_address } if interface.get('DNS'): proxy_dict['dns'] = [d.strip() for d in interface.get('DNS').split(',')] if peer.get('AllowedIPs'): proxy_dict['allowed-ips'] = [ip.strip() for ip in peer.get('AllowedIPs').split(',')] if peer.get('PresharedKey'): proxy_dict['preshared-key'] = peer.get("PresharedKey") if peer.get('PersistentKeepalive'): proxy_dict['keep-alive'] = int(peer.get("PersistentKeepalive")) amnezia_keys = ['Jc', 'Jmin', 'Jmax', 'S1', 'S2', 'H1', 'H2', 'H3', 'H4'] amnezia_opts = {key: interface.get(key) for key in amnezia_keys if interface.get(key) is not None} if amnezia_opts: proxy_dict['amnezia-wg-option'] = {k: v for k, v in amnezia_opts.items()} # Создаем YAML, который будет добавлен в `proxies:` # Используем NoQuoteDumper для вывода без кавычек, где это возможно yaml_string = yaml.dump([proxy_dict], default_flow_style=False, sort_keys=False, allow_unicode=True, indent=2) # Удаляем `- ` с первой строки, т.к. мы добавляем только один прокси за раз # и обертка в список нужна только для yaml.dump final_yaml = yaml_string.replace('- ', ' ', 1).replace(' name:', '- name:') return {"yaml": final_yaml.strip(), "name": name}, None except Exception as e: return None, str(e) def insert_proxy_logic(content, proxy_name, target_groups): lines = content.splitlines() new_lines = [] def get_indent(s): return len(s) - len(s.lstrip()) in_group_section = False current_group_name = None in_proxies_list = False proxies_list_indent = -1 inserted_in_group = set() for i, line in enumerate(lines): stripped = line.strip() indent = get_indent(line) is_new_group = stripped.startswith('- name:') if is_new_group: if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False if stripped.startswith('proxy-groups:'): in_group_section = True elif in_group_section and indent == 0 and stripped and not stripped.startswith('#'): in_group_section = False in_proxies_list = False current_group_name = None if in_group_section: if is_new_group: raw_name = stripped.split(':', 1)[1].strip() current_group_name = raw_name.strip("'").strip('"') if current_group_name in target_groups and stripped.startswith('proxies:'): in_proxies_list = True proxies_list_indent = indent new_lines.append(line) continue if in_proxies_list: if not stripped or stripped.startswith('#'): new_lines.append(line) continue if ('DIRECT' in stripped or 'REJECT' in stripped) and current_group_name not in inserted_in_group: prefix = " " * indent new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) if indent <= proxies_list_indent: if current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False new_lines.append(line) if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') return "\n".join(new_lines) HTML_TEMPLATE = """ Mihomo Editor v18.4
✅ Успешно сохранено

Mihomo Studio

v18.4 Auto-Panel
Loaded: __TIME__

Профили

Управление прокси

Бэкапы

Оставить:
__BACKUPS__

Добавить в группы:

Удалить прокси

Консоль

...

Переименовать прокси

Выберите прокси для переименования:

Новое имя:

Новый профиль

Добавить прокси

Просмотр бэкапа


    
""" class H(http.server.SimpleHTTPRequestHandler): def end_headers(s): s.send_header('Cache-Control', 'no-store, no-cache, must-revalidate'); s.send_header('Pragma', 'no-cache'); s.send_header( 'Expires', '0'); super().end_headers() def get_bks(s): b = "" for f in sorted(glob.glob(BACKUP_DIR + "/*.yaml"), key=os.path.getmtime, reverse=True)[:10]: n = os.path.basename(f); t = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%d.%m %H:%M") b += f'''
{n}{t}
''' if not b: b = '
Нет бэкапов
' return b def get_prof_opts(s): curr = "" if os.path.exists(CONFIG_PATH): real = os.path.realpath(CONFIG_PATH) curr = os.path.splitext(os.path.basename(real))[0] opts = "" files = sorted(glob.glob(PROFILES_DIR + "/*.yaml")) for f in files: n = os.path.splitext(os.path.basename(f))[0] sel = "selected" if n == curr else "" opts += f'' return opts def get_panel_port(self): panel_port = '' try: with open(CONFIG_PATH, 'r') as f: config_content = f.read() # Улучшенный regex для поиска порта (учитывает кавычки и IP) # Ищет external-controller: "0.0.0.0:9090" или '127.0.0.1:9090' или просто :9090 match = re.search(r"external-controller:\s*(?:['\"]?)(?:[^:]*):(\d+)(?:['\"]?)", config_content) if match: panel_port = match.group(1) except (IOError, FileNotFoundError): pass return panel_port # --- PROXY LOGIC --- def proxy_pass(self, method): panel_port = self.get_panel_port() if not panel_port: self.send_error(500, "Panel port not found in config") return # Strip prefix rel_path = self.path.replace('/mihomo_panel/', '', 1) target_url = f"http://127.0.0.1:{panel_port}/{rel_path}" # Read Body content_len = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_len) if content_len > 0 else None # Create Request try: req = urllib.request.Request(target_url, data=body, method=method) for k, v in self.headers.items(): if k.lower() not in ['host', 'origin', 'referer']: req.add_header(k, v) # Важно: подменяем Host для корректной работы backend req.add_header('Host', f'127.0.0.1:{panel_port}') with urllib.request.urlopen(req) as resp: self.send_response(resp.status) for k, v in resp.getheaders(): # Фильтруем CORS заголовки от backend, т.к. мы их сами выставим если надо, # но здесь мы действуем как same-origin if k.lower() not in ['access-control-allow-origin', 'server', 'date']: self.send_header(k, v) self.end_headers() self.wfile.write(resp.read()) except urllib.error.HTTPError as e: self.send_response(e.code) for k, v in e.headers.items(): self.send_header(k, v) self.end_headers() self.wfile.write(e.read()) except Exception as e: # self.send_error(500, str(e)) pass # Silent fail to avoid crashing def do_GET(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('GET') return if s.path != '/': return s.send_error(404) c = open(CONFIG_PATH).read() if os.path.exists(CONFIG_PATH) else "proxies:\n" s.send_response(200); s.send_header('Content-type', 'text/html;charset=utf-8'); s.end_headers() out = HTML_TEMPLATE.replace('__JSON_CONTENT__', json.dumps(c)) \ .replace('__BACKUPS__', s.get_bks()) \ .replace('__PROFILES__', s.get_prof_opts()) \ .replace('__TIME__', datetime.now().strftime("%H:%M:%S")) s.wfile.write(out.encode('utf-8')) def do_POST(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('POST') return l = int(s.headers['Content-Length']); d = s.rfile.read(l).decode('utf-8', 'ignore') p = {k: v[0] for k, v in urllib.parse.parse_qs(d).items()}; a = p.get('act') s.send_response(200); s.send_header('Content-Type', 'application/json'); s.end_headers() # --- PROFILE ACTIONS --- if a == 'switch_prof': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): if os.path.exists(CONFIG_PATH) or os.path.islink(CONFIG_PATH): os.unlink(CONFIG_PATH) os.symlink(target, CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'Profile not found'}).encode('utf-8')) return if a == 'add_prof': n = p.get('name') c = p.get('content', '') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): s.wfile.write(json.dumps({'error': 'Профиль с таким именем уже существует'}).encode('utf-8')) else: with open(target, 'w') as f: f.write(c) if not os.path.exists(CONFIG_PATH): os.symlink(target, CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) return if a == 'del_prof': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") real_curr = os.path.realpath(CONFIG_PATH) if os.path.realpath(target) == real_curr: s.wfile.write( json.dumps({'error': 'Нельзя удалить активный профиль. Сначала переключитесь на другой.'}).encode( 'utf-8')) elif os.path.exists(target): os.remove(target) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'File not found'}).encode('utf-8')) return if a == 'get_prof_content': n = p.get('name') target = os.path.join(PROFILES_DIR, n + ".yaml") if os.path.exists(target): with open(target, 'r', encoding='utf-8') as f: content = f.read() s.wfile.write(json.dumps({'status': 'ok', 'content': content}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'Profile not found'}).encode('utf-8')) return if a == 'rename_proxy': old_name = p.get('old_name') new_name = p.get('new_name') content = p.get('content', '') if not all([old_name, new_name, content]): s.wfile.write(json.dumps({'error': 'Missing parameters'}).encode('utf-8')) return # 1. Замена в определении прокси: - name: "old_name" # Regex для поиска `name: 'old_name'`, `name: "old_name"` или `name: old_name` # Используем `re.escape` для безопасности escaped_old = re.escape(old_name) # (?P['"]?) - захватывает кавычку (если она есть) в группу 'quote' # \\1 - ссылается на захваченную кавычку, чтобы заменить на такую же pattern_def = r"(name\s*:\s*)(?P['\"]?)" + escaped_old + r"(?P=quote)" # Заменяем, сохраняя оригинальные кавычки content = re.sub(pattern_def, r'\g<1>"' + new_name + '"', content, count=1) # 2. Замена в списках proxy-groups: - "old_name" # Regex для поиска `- 'old_name'`, `- "old_name"` или `- old_name` pattern_list = r"(-\s+)(?P['\"]?)" + escaped_old + r"(?P=quote)" content = re.sub(pattern_list, r'\g<1>"' + new_name + '"', content) s.wfile.write(json.dumps({'status': 'ok', 'new_content': content}).encode('utf-8')) return # --- EXISTING ACTIONS --- if a == 'parse': link = p.get('link', '') custom_name = p.get('proxy_name') d, e = parse_vless(link, custom_name) s.wfile.write(json.dumps(d if d else {'error': e}).encode('utf-8')); return if a == 'add_wireguard': config_text = p.get('config_text', '') custom_name = p.get('proxy_name') if not config_text: s.wfile.write(json.dumps({'error': 'Empty config'}).encode('utf-8')) return proxy_data, err = parse_wireguard(config_text, custom_name) if err: s.wfile.write(json.dumps({'error': err}).encode('utf-8')) return s.wfile.write(json.dumps(proxy_data).encode('utf-8')) return if a == 'apply_insert': content = p.get('content', ''); p_name = p.get('proxy_name', ''); p_yaml = p.get('proxy_yaml', ''); targets = json.loads(p.get('targets', '[]')) lines = content.splitlines(); inserted = False for i, line in enumerate(lines): if line.strip().startswith('proxies:'): blk = p_yaml.splitlines(); for bi, bl in enumerate(blk): lines.insert(i + 1 + bi, " " + bl) inserted = True; break if not inserted: lines.append("proxies:"); lines.extend([" " + l for l in p_yaml.splitlines()]) uc = insert_proxy_logic("\n".join(lines), p_name, targets) s.wfile.write(json.dumps({'new_content': uc}).encode('utf-8')); return if a == 'clean_backups': limit = int(p.get('limit', 5)) files = sorted(glob.glob(BACKUP_DIR + "/*.yaml"), key=os.path.getmtime, reverse=True) if len(files) > limit: for f in files[limit:]: try: os.remove(f) except: pass s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'del_backup': fname = p.get('f') path = os.path.join(BACKUP_DIR, os.path.basename(fname)) if os.path.exists(path): os.remove(path) s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'rest': shutil.copy(os.path.join(BACKUP_DIR, os.path.basename(p.get('f'))), CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')); return if a == 'view_backup': fname = p.get('f') path = os.path.join(BACKUP_DIR, os.path.basename(fname)) if os.path.exists(path): with open(path, 'r', encoding='utf-8') as f: content = f.read() s.wfile.write(json.dumps({'content': content}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'File not found'}).encode('utf-8')) return new_c = p.get('content', '').replace('\r\n', '\n') if a in ['save', 'restart']: if os.path.exists(CONFIG_PATH): real_p = os.path.basename(os.path.realpath(CONFIG_PATH)) prof_n = os.path.splitext(real_p)[0] ts = datetime.now().strftime('%Y%m%d_%H%M%S') shutil.copy(CONFIG_PATH, f"{BACKUP_DIR}/{prof_n}_{ts}.yaml") with open(CONFIG_PATH, 'w') as f: f.write(new_c); f.flush(); os.fsync(f.fileno()) if a == 'restart': my_env = os.environ.copy(); my_env["TERM"] = "xterm-256color" subprocess.run(RESTART_CMD, shell=True, env=my_env) log = open(LOG_FILE).read() if os.path.exists(LOG_FILE) else "Log empty" s.wfile.write(json.dumps({'log': log}).encode('utf-8')) elif a == 'save': s.wfile.write(json.dumps( {'status': 'ok', 'time': datetime.now().strftime("%H:%M:%S"), 'backups': s.get_bks()}).encode('utf-8')) def do_PUT(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('PUT') return s.send_error(405, "Method Not Allowed") def do_DELETE(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('DELETE') return s.send_error(405, "Method Not Allowed") try: socketserver.TCPServer.allow_reuse_address = True; socketserver.TCPServer(("", PORT), H).serve_forever() except Exception as e: print(e)