# !/opt/bin/python3 # -*- coding: utf-8 -*- import http.server import socketserver import os import subprocess import urllib.parse import urllib.request import urllib.error import re import time import shutil import glob import json from datetime import datetime # --- НАСТРОЙКИ --- PORT = 8888 CONFIG_DIR = "/opt/etc/mihomo" CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml") PROFILES_DIR = os.path.join(CONFIG_DIR, "profiles") BACKUP_DIR = os.path.join(CONFIG_DIR, "backup") LOG_FILE = "/tmp/mihomo_last_restart.log" RESTART_CMD = "xkeen -restart > " + LOG_FILE + " 2>&1" # --- ИНИЦИАЛИЗАЦИЯ --- if not os.path.exists(BACKUP_DIR): os.makedirs(BACKUP_DIR) if not os.path.exists(PROFILES_DIR): os.makedirs(PROFILES_DIR) if os.path.exists(CONFIG_PATH) and not os.path.islink(CONFIG_PATH): shutil.move(CONFIG_PATH, os.path.join(PROFILES_DIR, "default.yaml")) os.symlink(os.path.join(PROFILES_DIR, "default.yaml"), CONFIG_PATH) elif not os.path.exists(CONFIG_PATH): def_prof = os.path.join(PROFILES_DIR, "default.yaml") with open(def_prof, 'w') as f: f.write("proxies: []\n") os.symlink(def_prof, CONFIG_PATH) # --- ПАРСЕРЫ --- def parse_vless(link): try: if not link.startswith("vless://"): return None, "Link error" main = link[8:] name = "VLESS" if '#' in main: main, n = main.split('#', 1); name = urllib.parse.unquote(n).strip() name = re.sub(r'[\[\]\{\}\"\']', '', name) user_srv = main.split('?')[0] params = urllib.parse.parse_qs(main.split('?')[1]) if '?' in main else {} if '@' in user_srv: uuid, srv_port = user_srv.split('@', 1) else: return None, "No UUID" if ':' in srv_port: if ']' in srv_port: srv, port = srv_port.rsplit(':', 1); srv = srv.replace('[', '').replace(']', '') else: srv, port = srv_port.split(':') else: return None, "No Port" def get(k): return params.get(k, [''])[0] y = ['- name: "' + name + '"', ' type: vless', ' server: ' + srv, ' port: ' + port, ' uuid: ' + uuid, ' udp: true'] y.append(' network: ' + (get('type') or 'tcp')) if get('flow'): y.append(' flow: ' + get('flow')) if get('security'): y.append(' tls: true') if get('security') == 'reality': y.extend([' servername: ' + get('sni'), ' client-fingerprint: ' + (get('fp') or 'chrome'), ' reality-opts:', ' public-key: ' + get('pbk')]) if get('sid'): y.append(' short-id: ' + get('sid')) else: if get('sni'): y.append(' servername: ' + get('sni')) if get('fp'): y.append(' client-fingerprint: ' + get('fp')) if get('alpn'): av = get("alpn").replace(",", '", "') y.append(' alpn: ["' + av + '"]') if get('type') == 'ws': y.append(' ws-opts:') if get('path'): y.append(' path: ' + get('path')) if get('host'): y.extend([' headers:', ' Host: ' + get('host')]) elif get('type') == 'grpc' and get('serviceName'): y.extend([' grpc-opts:', ' grpc-service-name: ' + get('serviceName')]) return {"yaml": "\n".join(y), "name": name}, None except Exception as e: return None, str(e) def insert_proxy_logic(content, proxy_name, target_groups): lines = content.splitlines() new_lines = [] def get_indent(s): return len(s) - len(s.lstrip()) in_group_section = False current_group_name = None in_proxies_list = False proxies_list_indent = -1 inserted_in_group = set() for i, line in enumerate(lines): stripped = line.strip() indent = get_indent(line) is_new_group = stripped.startswith('- name:') if is_new_group: if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False if stripped.startswith('proxy-groups:'): in_group_section = True elif in_group_section and indent == 0 and stripped and not stripped.startswith('#'): in_group_section = False in_proxies_list = False current_group_name = None if in_group_section: if is_new_group: raw_name = stripped.split(':', 1)[1].strip() current_group_name = raw_name.strip("'").strip('"') if current_group_name in target_groups and stripped.startswith('proxies:'): in_proxies_list = True proxies_list_indent = indent new_lines.append(line) continue if in_proxies_list: if not stripped or stripped.startswith('#'): new_lines.append(line) continue if ('DIRECT' in stripped or 'REJECT' in stripped) and current_group_name not in inserted_in_group: prefix = " " * indent new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) if indent <= proxies_list_indent: if current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') inserted_in_group.add(current_group_name) in_proxies_list = False new_lines.append(line) if in_proxies_list and current_group_name in target_groups and current_group_name not in inserted_in_group: prefix = " " * (proxies_list_indent + 2) new_lines.append(prefix + '- "' + proxy_name + '"') return "\n".join(new_lines) HTML_TEMPLATE = """
Выберите прокси для переименования:
Новое имя:
['"]?) - захватывает кавычку (если она есть) в группу 'quote' # \\1 - ссылается на захваченную кавычку, чтобы заменить на такую же pattern_def = r"(name\s*:\s*)(?P['\"]?)" + escaped_old + r"(?P=quote)" # Заменяем, сохраняя оригинальные кавычки content = re.sub(pattern_def, r'\g<1>"' + new_name + '"', content, count=1) # 2. Замена в списках proxy-groups: - "old_name" # Regex для поиска `- 'old_name'`, `- "old_name"` или `- old_name` pattern_list = r"(-\s+)(?P['\"]?)" + escaped_old + r"(?P=quote)" content = re.sub(pattern_list, r'\g<1>"' + new_name + '"', content) s.wfile.write(json.dumps({'status': 'ok', 'new_content': content}).encode('utf-8')) return # --- EXISTING ACTIONS --- if a == 'parse': d, e = parse_vless(p.get('link', '')) s.wfile.write(json.dumps(d if d else {'error': e}).encode('utf-8')); return if a == 'apply_insert': content = p.get('content', ''); p_name = p.get('proxy_name', ''); p_yaml = p.get('proxy_yaml', ''); targets = json.loads(p.get('targets', '[]')) lines = content.splitlines(); inserted = False for i, line in enumerate(lines): if line.strip().startswith('proxies:'): blk = p_yaml.splitlines(); for bi, bl in enumerate(blk): lines.insert(i + 1 + bi, " " + bl) inserted = True; break if not inserted: lines.append("proxies:"); lines.extend([" " + l for l in p_yaml.splitlines()]) uc = insert_proxy_logic("\n".join(lines), p_name, targets) s.wfile.write(json.dumps({'new_content': uc}).encode('utf-8')); return if a == 'clean_backups': limit = int(p.get('limit', 5)) files = sorted(glob.glob(BACKUP_DIR + "/*.yaml"), key=os.path.getmtime, reverse=True) if len(files) > limit: for f in files[limit:]: try: os.remove(f) except: pass s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'del_backup': fname = p.get('f') path = os.path.join(BACKUP_DIR, os.path.basename(fname)) if os.path.exists(path): os.remove(path) s.wfile.write(json.dumps({'backups': s.get_bks()}).encode('utf-8')); return if a == 'rest': shutil.copy(os.path.join(BACKUP_DIR, os.path.basename(p.get('f'))), CONFIG_PATH) s.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8')); return if a == 'view_backup': fname = p.get('f') path = os.path.join(BACKUP_DIR, os.path.basename(fname)) if os.path.exists(path): with open(path, 'r', encoding='utf-8') as f: content = f.read() s.wfile.write(json.dumps({'content': content}).encode('utf-8')) else: s.wfile.write(json.dumps({'error': 'File not found'}).encode('utf-8')) return new_c = p.get('content', '').replace('\r\n', '\n') if a in ['save', 'restart']: if os.path.exists(CONFIG_PATH): real_p = os.path.basename(os.path.realpath(CONFIG_PATH)) prof_n = os.path.splitext(real_p)[0] ts = datetime.now().strftime('%Y%m%d_%H%M%S') shutil.copy(CONFIG_PATH, f"{BACKUP_DIR}/{prof_n}_{ts}.yaml") with open(CONFIG_PATH, 'w') as f: f.write(new_c); f.flush(); os.fsync(f.fileno()) if a == 'restart': my_env = os.environ.copy(); my_env["TERM"] = "xterm-256color" subprocess.run(RESTART_CMD, shell=True, env=my_env) log = open(LOG_FILE).read() if os.path.exists(LOG_FILE) else "Log empty" s.wfile.write(json.dumps({'log': log}).encode('utf-8')) elif a == 'save': s.wfile.write(json.dumps( {'status': 'ok', 'time': datetime.now().strftime("%H:%M:%S"), 'backups': s.get_bks()}).encode('utf-8')) def do_PUT(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('PUT') return s.send_error(405, "Method Not Allowed") def do_DELETE(s): if s.path.startswith('/mihomo_panel/'): s.proxy_pass('DELETE') return s.send_error(405, "Method Not Allowed") try: socketserver.TCPServer.allow_reuse_address = True socketserver.TCPServer(("", PORT), H).serve_forever() except Exception as e: print(e)