PM2监工启动

一、安装“监工” (PM2)

1.Termux 终端里,直接复制运行下面这两行命令(安装 Node.js 环境和 PM2):

1
pkg install nodejs -y
1
npm install -g pm2

2.彻底改造start.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
cat > ~/start.sh << 'EOF'
#!/bin/bash
echo "🚀 正在唤醒矩阵服务 (修正版)..."

# 1. 启动 OpenList (补上 server 参数)
cd ~/openlist && pm2 start ./openlist --name "openlist" -- server

# 2. 启动 Cloudflare 隧道 (修正参数传递语法)
pm2 start cloudflared --name "cloudflared" -- tunnel --protocol http2 --config /data/data/com.termux/files/home/.cloudflared/config.yml run emby_tunnel

# 3. 启动 FRP 穿透
pm2 start /data/data/com.termux/files/home/frp_0.56.0_linux_arm64/frpc --name "frpc" -- -c /data/data/com.termux/files/home/frp_0.56.0_linux_arm64/frpc.toml

# 4. 启动 Aria2
pm2 start aria2c --name "aria2" -- --conf-path=/data/data/com.termux/files/home/.config/aria2/aria2.conf

# 5. 启动 Python 脚本 (使用全路径更稳)
pm2 start ~/tg_bridge.py --name "tg_bridge" --interpreter python
pm2 start ~/auto_189.py --name "auto_189" --interpreter python

# 6. crond
pkill -9 crond # 先杀掉旧的防止锁死
crond
echo "✅ 定时任务 (crond) 已重置启动"

# 保存状态
pm2 save

echo "==================================================="
echo "🎉 矩阵全面复活!所有服务已纳入不死守护!"
echo "👉 输入 'pm2 list' 可查看所有服务运行状态。"
echo "👉 输入 'pm2 logs' 可查看所有服务的实时报错日志。"
echo "==================================================="
EOF

chmod +x ~/start.sh

这样改完之后,你获得了什么神级体验?
绝对的不死之身: 如果 Cloudflare 隧道因为网络波动断开了(导致进程退出),或者你的 Python 脚本报错闪退了,PM2 会瞬间捕获,然后自动执行重启,根本不需要你手动去管。

极简的启动方式: 你只需要运行一次 ./start.sh。因为最后有一句 pm2 save,以后如果你手机重启了,或者 Termux 被强退了,你重新打开 Termux,只需要输入一句 pm2 resurrect,所有的服务就会瞬间按原样全部复活,连 ./start.sh 都不用再敲了。

抓虫(排错)神器: 以前你排查日志要去各个文件里找(比如 tunnel.log、run.log)。现在,你只需要敲一句 pm2 logs,所有服务的报错和运行日志会像监控屏幕一样花花绿绿地排在你面前,谁出了问题一目了然。

你先把旧的服务全部杀掉(或者直接重启一下 Termux),然后跑一次新版的 ./start.sh,输入 pm2 list 看看那张华丽的进程监控表,你就知道这套方案有多爽了!

3.功能说明
PM2 (黑心监工模式)
你用 PM2 启动服务,就像是你不仅雇了工人,还雇了一个不睡觉的“黑心监工”死死盯着他们。

自动复活(最核心的区别): 如果 Cloudflare 因为网络波动自己断开退出了,或者 Python 脚本报错闪退了。PM2 发现工人倒下了,会在 1 秒钟之内自动把他“抽醒”,强行让他重新开始干活! 根本不需要你插手,服务自己就恢复了。

永不罢工: 只要 Termux 软件还没被系统彻底杀掉,你的服务就永远不会因为报错而彻底停机。
除了“不死之身”,PM2 还有这三大吊打 nohup 的爽点:
对比一:你想看服务有没有在跑?

  • nohup: 你得输入恶心的 ps -aux | grep cloudflared,在一堆乱码里找进程号。

  • PM2: 你敲一句 pm2 list,屏幕上直接弹出一个排版极其整齐的表格。哪个服务在线(绿色的 online),哪个服务重启了几次,消耗了多少内存,一目了然。

对比二:你想看报错日志?

  • nohup: 你得满世界找日志文件,一会儿 cat tunnel.log,一会儿 cat run.log,找起来极其痛苦。

  • PM2: 你敲一句 pm2 logs,所有服务的实时输出和报错,会用不同颜色按时间顺序在屏幕上滚动。排错极其舒服。

对比三:你想单独停掉某个服务(比如只停掉 TG 机器人)?

  • nohup: 你得先用 ps 查到进程号,然后小心翼翼地 kill -9 进程号,搞不好还杀错了。

  • PM2: 你敲一句 pm2 stop tg_bridge 就行了。想再开,敲 pm2 start tg_bridge。随用随停。

总结
你以前的脚本没问题,只是在安卓这种极其恶劣的后台环境里,nohup 过于脆弱,它只管“生”,不管“养”。引入 PM2,就是为了给你的矩阵服务套上一件“复活甲”,彻底告别那种需要你频繁手动去擦屁股、重跑脚本的日子。

怎么样,现在搞明白这个“监工”的含金量了吗?要不要在你的 Termux 里敲一下 pm2 list 看看效果?

4.进程清理

1
pm2 delete all

5.启动步骤
第一次启动

1
./start.sh

以后只需
手动一键复活
当你重新打开 Termux 时,只需要输入这一句:

1
pm2 resurrect

6.彻底自动化(开机即用)
如果你连 pm2 resurrect 都不想打,想让 Termux 一启动就自动复活所有服务,可以把这行指令写进 Termux 的启动配置文件里:
6.1.编辑配置文件:

1
nano ~/.bashrc

6.2.在文件的最末尾,另起一行加上:

1
2
3
4
5
6
# 自动复活 PM2 管理的服务
pm2 resurrect
# 启动定时任务
crond
# 锁定 CPU 唤醒,防止息屏断网
termux-wake-lock

6.3.按 Ctrl + O 保存,Enter 确认,CTRL+X -> Y -> 回车。 退出。

7.日常维护常用指令
既然用了“正规军”管理,这几个指令建议记一下,排查问题非常方便:

查看谁在跑: pm2 list (看状态是否是绿色的 online)。

看报错日志: pm2 logs (如果哪个服务没连上,这里会实时显示报错原因)。

单独重启某项: pm2 restart frpc (只重启 FRP,不影响别的)。

停止某项: pm2 stop auto_189 (临时关掉追剧脚本)。

为了让你更直观地理解 PM2 是如何保护你的“矩阵”不掉线的,你可以通过下面的模拟器体验一下“自动重启”和“保存复活”的逻辑。

二、cloudflared tunnel掉线的问题
1.flclash的设置
调教 Flclash 的底层规则(硬核玩法)
如果你不想在 Emby 里设置,或者你的刮削工具不支持填代理,那就得给 Flclash “动手术”了。

核心思路: 把 Termux 移出白名单(让 Flclash 接管 Termux,保证刮削正常),但通过底层规则,强行把 Cloudflare 隧道的流量给“放行”。

解决 Fake-IP 拦截:
昨天日志里的 UDP 53 timeout 是因为 Flclash 的假 IP (Fake-IP) 机制拦截了 Cloudflare 的域名解析。

去 Flclash 的 设置 -> DNS设置 -> Fake-IP 过滤 (假 IP 黑名单/绕过名单)。

在里面加上一行:+.argotunnel.com

(这样 Flclash 就不会再拦截隧道的 DNS 请求了)。

解决流量直连:

去 Flclash 的 规则 (Rules) 里,添加两条自定义直连规则(DIRECT):
DOMAIN-SUFFIX,argotunnel.com,DIRECT
IP-CIDR,8.8.4.4/32,DIRECT

(因为 Cloudflare 隧道底层强制使用 8.8.4.4 这个 IP 来发请求,必须把它直连放行)。

🎯 结果: Termux 全局走代理,Emby 刮削秒出图;唯独当 Cloudflare 隧道尝试连接服务器时,Flclash 会绿灯放行,互不干扰。

2.Nekobox的设置,推荐的方法

2.1 把需代理的应用选择分应用代理

2.2 在底部点击 路由 (Route)。

找到 直连规则 (Direct) 或者自定义规则。

在域名后缀 (Domain Suffix) 或者域名 (Domain) 列表里,填入这两行:

1
2
argotunnel.com
cftunnel.com

保存配置,重启一下 NekoBox 的连接。

2.3.给 Termux 里的隧道上一道“TCP”保险
因为 NekoBox 现在接管了所有流量,如果用默认的 UDP 协议,容易被代理内核的连接池意外回收。咱们必须强制隧道走稳定的 TCP 协议:
回到 Termux,重新用带 http2 的命令启动:

1
2
3
pm2 delete cloudflared
pm2 start cloudflared --name "cloudflared" -- tunnel --protocol http2 run --token "eyJhIjoiYjY3YTc2ZTU4ZjNkMGM2MjllZjcyMTg1YTZmZjQ3MjQiLCJ0IjoiOGM2ZTY5OTAtY2MwYi00ZTk3LWI2ODEtNmQ3OThkZTdhOGEzIiwicyI6IlN4NSsxUWdjcVg1dVdvQk5hWUIvWG5xR091c2N3bDBFMzZHdHdieWFxaGc9In0="
pm2 save

三、auto_189.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
import os
import json
import time
import requests
import re
import subprocess
import random
from urllib import parse
from Crypto.Cipher import PKCS1_v1_5 as Cipher_pksc1_v1_5
from Crypto.PublicKey import RSA
import logging
import schedule
from dotenv import load_dotenv
from datetime import datetime

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
load_dotenv(dotenv_path="sys.env", override=True)

ENV_189_CLIENT_ID = os.getenv("ENV_189_CLIENT_ID", "")
ENV_189_CLIENT_SECRET = os.getenv("ENV_189_CLIENT_SECRET", "")
TG_BOT_TOKEN = os.getenv("ENV_TG_BOT_TOKEN", "")
TG_ADMIN_USER_ID = os.getenv("ENV_TG_ADMIN_USER_ID", "")

SUBS_FILE = "db/subscriptions.json"
HISTORY_FILE = "db/history.json"

def load_json(filepath):
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
return {}

def save_json(filepath, data):
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)

def clean_filename(name):
illegal_chars = '"\\/:*?|<>'
for char in illegal_chars:
name = name.replace(char, '')
return name[:255]

def rsaEncrpt(password, public_key):
rsakey = RSA.importKey(public_key)
cipher = Cipher_pksc1_v1_5.new(rsakey)
return cipher.encrypt(password.encode()).hex()

def generate_smart_name(original_filename, sub_path):
import re
import os
path_parts = sub_path.strip('/').split('/')
folder_name = path_parts[-1]
for part in reversed(path_parts):
if re.match(r'(?i)^Season\s*\d+$|^S\d+$', part.strip()):
continue
folder_name = part.strip()
break
year_in_path = re.search(r'\((\d{4})\)', folder_name)
year_str = year_in_path.group(1) if year_in_path else ""
clean_show_name = folder_name
clean_show_name = re.sub(r'\(\d{4}\)', '', clean_show_name)
clean_show_name = re.sub(r'(?i)\b(DV|4K|1080p|720p|2160p|WEB-DL|HDR|SDR|H265|x265|BluRay|Remux)\b', '', clean_show_name)
clean_show_name = re.sub(r'[-_\s]+$', '', clean_show_name).strip()
_, ext = os.path.splitext(original_filename)
if not ext or len(ext) > 5: ext = ".mp4"
ep_patterns = [
r'(?i)E(?:P)?\s*(\d+)',
r'第\s*(\d+)\s*[集话期]',
r'(?:\[|\()(\d+)(?:\]|\))',
r'\s+0*(\d{1,3})\s*(?:\.|$)'
]
ep_num = None
for pattern in ep_patterns:
match = re.search(pattern, original_filename)
if match:
ep_num = int(match.group(1))
break
if ep_num is None: return original_filename
season_num = 1
s_match_file = re.search(r'(?i)S0*(\d+)', original_filename)
if s_match_file:
season_num = int(s_match_file.group(1))
else:
s_match_path = re.search(r'(?i)Season\s*(\d+)', sub_path)
if s_match_path:
season_num = int(s_match_path.group(1))
year_part = f".{year_str}" if year_str else ""
return f"{clean_show_name}.S{season_num:02d}E{ep_num:02d}{year_part}{ext}"

class TelegramNotifier:
def __init__(self, bot_token, user_id):
self.bot_token = bot_token
self.user_id = user_id
self.base_url = f"https://api.telegram.org/bot{self.bot_token}/" if self.bot_token else None

def send_message(self, message):
if not self.bot_token: return False
try:
requests.get(f"{self.base_url}sendMessage", params={"chat_id": self.user_id, "text": message}, timeout=10)
return True
except: return False

class Cloud189ShareInfo:
def __init__(self, fileId, shareId, shareMode, cloud189Client, accessCode="", is_folder=True, file_name=""):
self.shareDirFileId = fileId
self.shareId = shareId
self.session = cloud189Client.session
self.client = cloud189Client
self.shareMode = shareMode
self.accessCode = accessCode
self.is_folder = is_folder
self.file_name = file_name

def getAllShareFiles(self, folder_id=None):
if not self.is_folder and folder_id is None:
return {"files": [{"id": self.shareDirFileId, "name": self.file_name}], "folders": []}
if folder_id is None:
folder_id = self.shareDirFileId
fileList, folders = [], []
pageNumber = 1
while True:
result = self.session.get("https://cloud.189.cn/api/open/share/listShareDir.action", params={
"pageNum": pageNumber, "pageSize": "10000", "fileId": folder_id,
"shareDirFileId": self.shareDirFileId, "isFolder": "true",
"shareId": self.shareId, "shareMode": self.shareMode,
"orderBy": "lastOpTime", "descending": "true", "accessCode": self.accessCode,
}).json()
if result['res_code'] != 0: break
fileListAO = result.get("fileListAO", {})
fileList += fileListAO.get("fileList", [])
folders += fileListAO.get("folderList", [])
if fileListAO.get("fileListSize", 0) == 0 and len(fileListAO.get("folderList", [])) == 0: break
pageNumber += 1
return {"files": fileList, "folders": folders}

def saveShareFiles(self, tasksInfos, targetFolderId):
try:
response = self.session.post("https://cloud.189.cn/api/open/batch/createBatchTask.action", data={
"type": "SHARE_SAVE", "taskInfos": str(tasksInfos),
"targetFolderId": targetFolderId, "shareId": self.shareId,
}).json()
if response.get("res_code") != 0: return response.get('res_message', 'UNKNOWN_ERROR')
taskId = response["taskId"]
while True:
res = self.session.post("https://cloud.189.cn/api/open/batch/checkBatchTask.action", data={
"taskId": taskId, "type": "SHARE_SAVE"
}).json()
if res["taskStatus"] != 3 or res.get("errorCode"): break
time.sleep(1)
return res.get("errorCode")
except Exception as e: return str(e)

class Cloud189:
def __init__(self):
self.session = requests.session()
self.session.headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json;charset=UTF-8",
}

def getEncrypt(self):
return self.session.post("https://open.e.189.cn/api/logbox/config/encryptConf.do", data={'appId': 'cloud'}, timeout=15).json()['data']['pubKey']

def getRedirectURL(self):
rsp = self.session.get('https://cloud.189.cn/api/portal/loginUrl.action?redirectURL=https://cloud.189.cn/web/redirect.html?returnURL=/main.action', timeout=15)
return parse.parse_qs(parse.urlparse(rsp.url).query)

def login(self, username, password):
encryptKey = self.getEncrypt()
query = self.getRedirectURL()
resData = self.session.post('https://open.e.189.cn/api/logbox/oauth2/appConf.do', data={"version": '2.0', "appKey": 'cloud'}, headers={"Referer": 'https://open.e.189.cn/', "lt": query["lt"][0], "REQID": query["reqId"][0]}, timeout=15).json()
keyData = f"-----BEGIN PUBLIC KEY-----\n{encryptKey}\n-----END PUBLIC KEY-----"
data = {
"appKey": 'cloud', "version": '2.0', "accountType": '01', "mailSuffix": '@189.cn',
"returnUrl": resData['data']['returnUrl'], "paramId": resData['data']['paramId'],
"clientType": '1', "isOauth2": "false",
"userName": f"{{NRP}}{rsaEncrpt(username, keyData)}",
"password": f"{{NRP}}{rsaEncrpt(password, keyData)}",
}
result = self.session.post('https://open.e.189.cn/api/logbox/oauth2/loginSubmit.do', data=data, headers={'Referer': 'https://open.e.189.cn/', 'lt': query["lt"][0], 'REQID': query["reqId"][0]}, timeout=15).json()
if result['result'] == 0:
self.session.get(result['toUrl'], headers={"Host": 'cloud.189.cn'}, timeout=15)
else: raise Exception(result['msg'])

def getShareInfo(self, link):
url = parse.urlparse(link)
try: code = parse.parse_qs(url.query)["code"][0]
except: code = url.path.split('/')[-1]
pwd = parse.parse_qs(url.query).get('pwd', [''])[0]
result = self.session.get("https://cloud.189.cn/api/open/share/getShareInfoByCodeV2.action", params={"shareCode": code}).json()
if result.get('res_code') == 8001 or "失效" in str(result) or "取消" in str(result) or "不存在" in str(result) or "审核" in str(result):
raise Exception(f"SHARE_DEAD: 分享已失效或未通过审核 [{result.get('res_message', '未知原因')}]")
if result.get('res_code') != 0: raise Exception(f"获取分享失败,可能掉线: {result}")
file_id = result.get("fileId")
share_mode = result.get("shareMode", 1)
share_id = result.get("shareId")
raw_is_folder = result.get("isFolder")
is_folder = True if raw_is_folder is None else str(raw_is_folder).lower() in ['true', '1']
file_name = result.get("fileName", "未命名文件")
if pwd:
verify_res = self.session.get("https://cloud.189.cn/api/open/share/checkAccessCode.action", params={"shareCode": code, "accessCode": pwd}).json()
if verify_res.get('res_code') != 0: raise Exception(f"提取码错误或失效: {verify_res}")
share_id = verify_res.get("shareId")
if not share_id: raise Exception("未能获取到 shareId,疑似掉线拦截。")
return Cloud189ShareInfo(file_id, share_id, share_mode, self, pwd, is_folder, file_name)

def createFolder(self, name, parentFolderId=-11):
result = self.session.post("https://cloud.189.cn/api/open/file/createFolder.action", data={"parentFolderId": parentFolderId, "folderName": name}).json()
return result["id"]

def getObjectFolderNodes(self, folderId=-11):
res = self.session.post("https://cloud.189.cn/api/portal/getObjectFolderNodes.action", data={"id": folderId, "orderBy": 1, "order": "ASC"}).json()
if isinstance(res, dict): raise Exception(f"账号掉线或被限制 (获取目录失败): {res}")
return res

def mkdirAll(self, path, parentFolderId=-11):
path = path.strip("/")
if not path: return parentFolderId
for name in path.split("/"):
found = False
for node in self.getObjectFolderNodes(parentFolderId):
if node["name"] == name:
parentFolderId = node["id"]
found = True
break
if not found:
parentFolderId = self.createFolder(name, parentFolderId)
return parentFolderId

def listPrivateFiles(self, folderId):
try:
res = self.session.get("https://cloud.189.cn/api/open/file/listFiles.action", params={"folderId": folderId, "pageNum": 1, "pageSize": 1000}).json()
if res.get("res_code") == 0: return res.get("fileListAO", {}).get("fileList", [])
except: pass
return []

def renameFile(self, fileId, destFileName):
try:
res = self.session.post("https://cloud.189.cn/api/open/file/renameFile.action", data={"fileId": fileId, "destFileName": destFileName}).json()
return res.get("res_code") == 0
except: return False

def get_all_share_files_recursive(info, folder_id=None, current_path=""):
all_files = []
result = info.getAllShareFiles(folder_id)
for f in result.get("files", []):
f["full_path"] = current_path + "/" + f["name"]
all_files.append(f)
for folder in result.get("folders", []):
new_path = current_path + "/" + folder["name"]
all_files.extend(get_all_share_files_recursive(info, folder["id"], new_path))
return all_files

def auto_relogin(client):
logger.info("🔄 触发自动保活机制:正在重新登录...")
try:
client.login(ENV_189_CLIENT_ID, ENV_189_CLIENT_SECRET)
logger.info("✅ 自动重新登录成功!通行证已刷新。")
return True
except Exception as e: return False

def check_subscriptions(client, force_target_id=None):
subs = load_json(SUBS_FILE)
history = load_json(HISTORY_FILE)
notifier = TelegramNotifier(TG_BOT_TOKEN, TG_ADMIN_USER_ID)
if not subs: return

current_time = time.time()
for target_id, sub_info in subs.items():
try:
share_url = sub_info if isinstance(sub_info, str) else sub_info.get("url", "")
keyword = "" if isinstance(sub_info, str) else sub_info.get("keyword", "")
path = "" if isinstance(sub_info, str) else sub_info.get("path", "")
freq = "" if isinstance(sub_info, str) else sub_info.get("freq", "")

if force_target_id and str(target_id) == str(force_target_id): pass
elif path:
now = datetime.now()
curr_h, curr_m, curr_w = now.hour, now.minute, now.weekday()
next_check_time = 0 if isinstance(sub_info, str) else sub_info.get("next_check_time", 0)
if current_time < next_check_time: continue

if freq == "剧迷":
if not ((10 <= curr_h < 12) or (18 <= curr_h < 24)): continue
elif freq == "周更" or "周更" in path or "动漫" in path:
update_weekday = 5 if isinstance(sub_info, str) else sub_info.get("update_weekday", 5)
if curr_w < update_weekday: continue
is_am = (curr_h == 10 and curr_m >= 30) or (curr_h == 11)
is_pm = (curr_h >= 18 and curr_m >= 30) or (curr_h >= 19)
if not (is_am or is_pm): continue
elif freq == "日更" or "日更" in path or "电视剧" in path or "剧" in path:
if curr_h < 18: continue

info = client.getShareInfo(share_url)
all_files = get_all_share_files_recursive(info)
if keyword: all_files = [f for f in all_files if keyword.lower() in f["full_path"].lower()]

existing_names = set(v["name"] for k, v in history.items() if isinstance(v, dict) and v.get("sub_id") == str(target_id))
new_files = [f for f in all_files if str(f["id"]) not in history and clean_filename(f["name"]) not in existing_names]

if new_files:
taskInfos = [{"fileId": f["id"], "fileName": clean_filename(f["name"]), "isFolder": 0} for f in new_files]
for i in range(0, len(taskInfos), 50):
batch_tasks = taskInfos[i:i+50]
code = info.saveShareFiles(batch_tasks, target_id)

if not code:
file_names = []
renamed_count = 0

clean_path = path.strip("/")
openlist_target_path = f"/{clean_path}" if clean_path.startswith("177/") or clean_path == "177" else f"/177/{clean_path}"

time.sleep(3)
cloud_files = client.listPrivateFiles(target_id)

for task in batch_tasks:
original_name = task["fileName"]
history[str(task["fileId"])] = {"name": original_name, "sub_id": str(target_id)}
file_names.append(original_name)

new_name = generate_smart_name(original_name, path)
if new_name != original_name:
for cf in cloud_files:
if cf["name"] == original_name:
if client.renameFile(cf["id"], new_name):
renamed_count += 1
break

save_json(HISTORY_FILE, history)
notifier.send_message(f"✅【追剧更新】\n🔗 来源: {share_url}\n📂 新增文件:\n" + "\n".join(file_names))

if renamed_count > 0:
notifier.send_message(f"✨ 🚀 云端 API 强制更名: 已规范化 {renamed_count} 个文件!")
time.sleep(6)

try:
subprocess.Popen(["/data/data/com.termux/files/usr/bin/bash", "/data/data/com.termux/files/home/refresh.sh", openlist_target_path])
notifier.send_message(f"🔄 已自动触发 Emby 刮削入库: {openlist_target_path}")
except Exception as script_e: pass

subs_for_update = load_json(SUBS_FILE)
if str(target_id) in subs_for_update:
if isinstance(subs_for_update[str(target_id)], str):
subs_for_update[str(target_id)] = {"url": share_url, "keyword": keyword, "path": path}
subs_for_update[str(target_id)]["last_update"] = time.time()

if freq == "剧迷": subs_for_update[str(target_id)]["next_check_time"] = time.time() + 1200
elif freq == "周更" or "周更" in path or "动漫" in path:
target_weekday = subs_for_update[str(target_id)].get("update_weekday", 5)
days_ahead = target_weekday - curr_w
if days_ahead <= 0: days_ahead += 7
today_midnight = datetime(now.year, now.month, now.day).timestamp()
subs_for_update[str(target_id)]["next_check_time"] = today_midnight + days_ahead * 86400
elif freq == "日更" or "日更" in path or "电视剧" in path or "剧" in path:
subs_for_update[str(target_id)]["next_check_time"] = time.time() + 1800
else: subs_for_update[str(target_id)]["next_check_time"] = time.time() + 24 * 3600
save_json(SUBS_FILE, subs_for_update)
except Exception as e:
error_msg = str(e)
if "SHARE_DEAD" in error_msg:
# 日常巡逻中:监测到订阅失效通知并删库
subs_for_update = load_json(SUBS_FILE)
if str(target_id) in subs_for_update:
dead_path = subs_for_update[str(target_id)].get("path", "未知") if isinstance(subs_for_update[str(target_id)], dict) else "未知"
dead_url = subs_for_update[str(target_id)].get("url", share_url) if isinstance(subs_for_update[str(target_id)], dict) else subs_for_update[str(target_id)]
del subs_for_update[str(target_id)]
save_json(SUBS_FILE, subs_for_update)
notifier.send_message(f"❌ 警告:监测到订阅已失效!\n📁 目录: {dead_path}\n🔗 链接: {dead_url}\n🗑️ 已自动为您取消该订阅并清理历史记忆。")

history_data = load_json(HISTORY_FILE)
history_data = {k: v for k, v in history_data.items() if not (isinstance(v, dict) and str(v.get("sub_id")) == str(target_id))}
save_json(HISTORY_FILE, history_data)
continue
elif "掉线" in error_msg or "失败" in error_msg: auto_relogin(client)

def main_control_loop(client):
offset = 0
notifier = TelegramNotifier(TG_BOT_TOKEN, TG_ADMIN_USER_ID)

def scheduled_task():
check_subscriptions(client)
wait_min = random.randint(25, 45)
schedule.clear('patrol')
schedule.every(wait_min).minutes.do(scheduled_task).tag('patrol')

scheduled_task()
schedule.every(6).hours.do(auto_relogin, client)

while True:
schedule.run_pending()
try:
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/getUpdates?offset={offset}&timeout=10"
res = requests.get(url, timeout=15).json()
if res.get('ok'):
for item in res['result']:
offset = item['update_id'] + 1
msg = item.get('message', {})
text = msg.get('text', '')
chat_id = msg.get('chat', {}).get('id')

if str(chat_id) == str(TG_ADMIN_USER_ID):
text = text.strip()
match_bind = re.match(r'^(订阅|绑定)(\d)?\s+', text)
match_fill = re.match(r'^补档\s+(.*?)\s+(http[s]?://\S+)', text)
match_refresh = re.match(r'^(刷新|入库)\s+(.*)', text)

if "取消订阅" in text:
kw = re.sub(r'^取消订阅\d?\s*', '', text).strip()
if not kw: continue
subs = load_json(SUBS_FILE)
target_id = None
for sid, info in subs.items():
p = info.get("path", "") if isinstance(info, dict) else info
if kw == p or kw == f"/{p.strip('/')}":
target_id = sid; break
if not target_id:
for sid, info in subs.items():
p = info.get("path", "") if isinstance(info, dict) else info
if kw.lower() in p.lower():
target_id = sid; break

if target_id:
del subs[str(target_id)]
save_json(SUBS_FILE, subs)

history_data = load_json(HISTORY_FILE)
old_len = len(history_data)
history_data = {k: v for k, v in history_data.items() if not (isinstance(v, dict) and str(v.get("sub_id")) == str(target_id))}
deleted_count = old_len - len(history_data)
save_json(HISTORY_FILE, history_data)

notifier.send_message(f"✅ 已成功移除订阅。\n🗑️ 同步清理了 {deleted_count} 条历史文件记忆。")

elif match_bind:
action = match_bind.group(1)
season_num = match_bind.group(2)

freq_tag = ""
if "#周更" in text: freq_tag, text = "周更", text.replace("#周更", "").strip()
elif "#双更" in text: freq_tag, text = "双更", text.replace("#双更", "").strip()
elif "#剧迷" in text: freq_tag, text = "剧迷", text.replace("#剧迷", "").strip()
elif "#日更" in text: freq_tag, text = "日更", text.replace("#日更", "").strip()

weekday_map = {"周一": 0, "周二": 1, "周三": 2, "周四": 3, "周五": 4, "周六": 5, "周日": 6}
t_weekday = 5
for d_name, d_code in weekday_map.items():
if f"#{d_name}" in text:
t_weekday = d_code
text = text.replace(f"#{d_name}", "").strip()
break

is_bind = (action == "绑定")
parts = text.split()

share_url = ""
keyword = ""
target_path = ""

url_index = -1
for i, p in enumerate(parts):
if p.startswith("http"):
share_url = p
url_index = i
break

if url_index != -1:
target_path = " ".join(parts[1:url_index])
if url_index < len(parts) - 1:
keyword = " ".join(parts[url_index+1:])
else:
target_path = " ".join(parts[1:])

if season_num:
s_num = int(season_num)
if "season" not in target_path.lower(): target_path = f"{target_path.rstrip('/')}/Season {s_num}"
if not keyword:
keyword = f"S{s_num:02d}"

if not share_url:
subs = load_json(SUBS_FILE)
for tid, info in subs.items():
if isinstance(info, dict) and info.get("path") == target_path:
share_url = info.get("url", "")
break

if not share_url: continue

mode_name = "绑定(静默)" if is_bind else "订阅(下载)"
tag_msg = f" ⏱️ 频率: {freq_tag}" if freq_tag else ""
kw_msg = f" 🎯 过滤: {keyword}" if keyword else ""
day_cn = list(weekday_map.keys())[t_weekday]
day_msg = f" 📅 盯梢: {day_cn}" if freq_tag == "周更" else ""

notifier.send_message(f"⏳ 正在处理{mode_name}目录:\n📁 {target_path}{tag_msg}{kw_msg}{day_msg} ...")

# 提前验证链接,防止空建目录
try:
info = client.getShareInfo(share_url)
except Exception as e:
if "SHARE_DEAD" in str(e):
notifier.send_message(f"❌ {action}失败:该分享链接已失效或未通过审核!\n🔗 链接: {share_url}")
else:
notifier.send_message(f"❌ {action}失败:无法访问该链接 ({e})")
continue

try:
target_id = client.mkdirAll(target_path)
subs = load_json(SUBS_FILE)
subs[str(target_id)] = {
"url": share_url, "keyword": keyword, "path": target_path,
"last_update": 0, "freq": freq_tag, "update_weekday": t_weekday,
"next_check_time": 0
}
save_json(SUBS_FILE, subs)

if is_bind:
all_files = get_all_share_files_recursive(info)
if keyword: all_files = [f for f in all_files if keyword.lower() in f["full_path"].lower()]
history_data = load_json(HISTORY_FILE)
for f in all_files:
history_data[str(f["id"])] = {"name": f["name"], "sub_id": str(target_id)}
save_json(HISTORY_FILE, history_data)
notifier.send_message(f"✅ 成功绑定!\n❇️ 已将 {len(all_files)} 个旧文件标记为已存。")
else:
notifier.send_message(f"✅ 成功添加订阅!正在为您优先拉取资源...")
check_subscriptions(client, force_target_id=target_id)
except Exception as e: pass

elif match_fill:
keyword_input = match_fill.group(1).strip()
share_url = match_fill.group(2).strip()

m = re.match(r'^(.*?)\s*[sS第]?0?(\d+)[季]?$', keyword_input)
if m and m.group(1).strip():
base_kw = m.group(1).strip()
s_num = int(m.group(2))
else:
base_kw = keyword_input
s_num = None

msg_ext = f"\n🎯 锁定季数: 第 {s_num} 季" if s_num else ""
notifier.send_message(f"🔍 启动智能补档...\n🎯 解析剧名: {base_kw}{msg_ext}\n🔗 链接: {share_url}")

subs = load_json(SUBS_FILE)
matched_target = None

for t_id, info in subs.items():
path_in_db = info.get("path", "") if isinstance(info, dict) else ""
if base_kw.lower() in path_in_db.lower():
if s_num is not None:
s_patterns = [f"season {s_num}", f"s{s_num:02d}", f"s{s_num}"]
if any(p in path_in_db.lower() for p in s_patterns) or str(s_num) in path_in_db.split('/')[-1]:
matched_target = (t_id, path_in_db)
break
else:
matched_target = (t_id, path_in_db)
break

if not matched_target: continue

target_id, target_path = matched_target
notifier.send_message(f"🎯 命中目录: {target_path}\n🚀 正在免手动物理转存...")

# 补档时提前验证链接
try:
info = client.getShareInfo(share_url)
except Exception as e:
if "SHARE_DEAD" in str(e):
notifier.send_message(f"❌ 补档失败:该分享链接已失效或未通过审核!\n🔗 链接: {share_url}")
else:
notifier.send_message(f"❌ 补档失败:无法访问该链接 ({e})")
continue

try:
all_files = get_all_share_files_recursive(info)
history_data = load_json(HISTORY_FILE)

existing_names = set(v["name"] for k, v in history_data.items() if isinstance(v, dict) and v.get("sub_id") == str(target_id))
new_files = [f for f in all_files if clean_filename(f["name"]) not in existing_names]

clean_path = target_path.strip("/")
openlist_target_path = f"/{clean_path}" if clean_path.startswith("177/") or clean_path == "177" else f"/177/{clean_path}"

if not new_files:
notifier.send_message("⚠️ 补档完毕:链接里没有新文件,或者与已存文件同名(防重复拦截生效)。")
try: subprocess.Popen(["/data/data/com.termux/files/usr/bin/bash", "/data/data/com.termux/files/home/refresh.sh", openlist_target_path])
except: pass
else:
taskInfos = [{"fileId": f["id"], "fileName": clean_filename(f["name"]), "isFolder": 0} for f in new_files]
success_count = 0
renamed_count = 0

for i in range(0, len(taskInfos), 50):
batch = taskInfos[i:i+50]
code = info.saveShareFiles(batch, target_id)

if not code:
time.sleep(3)
cloud_files = client.listPrivateFiles(target_id)

for task in batch:
original_name = task["fileName"]
history_data[str(task["fileId"])] = {"name": original_name, "sub_id": str(target_id)}
success_count += 1

new_name = generate_smart_name(original_name, target_path)
if new_name != original_name:
for cf in cloud_files:
if cf["name"] == original_name:
if client.renameFile(cf["id"], new_name):
renamed_count += 1
break

save_json(HISTORY_FILE, history_data)
notifier.send_message(f"✅ 补档完美结束!\n📂 自动归档至: {target_path}\n📝 共计抓取 {success_count} 个新文件,底层记忆已更新。")

if renamed_count > 0:
notifier.send_message(f"✨ 🚀 云端 API 强制更名: 已规范化 {renamed_count} 个文件!")
time.sleep(6)

try:
subprocess.Popen(["/data/data/com.termux/files/usr/bin/bash", "/data/data/com.termux/files/home/refresh.sh", openlist_target_path])
notifier.send_message(f"🔄 已自动触发 Emby 刮削入库: {openlist_target_path}")
except Exception as e: pass
except: pass

elif match_refresh:
keyword_input = match_refresh.group(2).strip()

m = re.match(r'^(.*?)\s*[sS第]?0?(\d+)[季]?$', keyword_input)
if m and m.group(1).strip():
base_kw = m.group(1).strip()
s_num = int(m.group(2))
else:
base_kw = keyword_input
s_num = None

msg_ext = f" (第 {s_num} 季)" if s_num else ""
notifier.send_message(f"🔍 收到入库指令,正在检索: {base_kw}{msg_ext}...")

subs = load_json(SUBS_FILE)
matched_paths = []

for t_id, info in subs.items():
path_in_db = info.get("path", "") if isinstance(info, dict) else ""
if base_kw.lower() in path_in_db.lower():
if s_num is not None:
s_patterns = [f"season {s_num}", f"s{s_num:02d}", f"s{s_num}"]
if any(p in path_in_db.lower() for p in s_patterns) or str(s_num) in path_in_db.split('/')[-1]:
if path_in_db not in matched_paths:
matched_paths.append(path_in_db)
else:
if path_in_db not in matched_paths:
matched_paths.append(path_in_db)

if matched_paths:
notifier.send_message(f"🎯 共命中 {len(matched_paths)} 个关联目录,准备批量刷新...")
for mp in matched_paths:
clean_path = mp.strip("/")
openlist_p = f"/{clean_path}" if clean_path.startswith("177/") or clean_path == "177" else f"/177/{clean_path}"
try:
subprocess.run(["/data/data/com.termux/files/usr/bin/bash", "/data/data/com.termux/files/home/refresh.sh", openlist_p], timeout=120)
notifier.send_message(f"✅ 目录刷新成功: {openlist_p}")
except: pass
notifier.send_message("🎉 批量指令已全部呼叫 Emby,请前往查看!")
else:
fallback_path = base_kw.strip("/")
if s_num is not None:
fallback_path = f"{fallback_path}/Season {s_num}"

clean_path = fallback_path.strip("/")
openlist_p = f"/{clean_path}" if clean_path.startswith("177/") or clean_path == "177" else f"/177/{clean_path}"

notifier.send_message(f"⚠️ 库中无记录,触发盲狙刷新:\n📂 {openlist_p}")

try:
subprocess.run(["/data/data/com.termux/files/usr/bin/bash", "/data/data/com.termux/files/home/refresh.sh", openlist_p], timeout=120)
notifier.send_message(f"✅ 盲狙指令已下发!(Emby若存在该路径将自动入库)")
except Exception as e:
notifier.send_message(f"❌ 刷新指令下发失败: {e}")

else:
notifier.send_message("❌ 格式错误...")
except Exception as e: pass
time.sleep(2)

if __name__ == '__main__':
os.makedirs("db", exist_ok=True)

notifier = TelegramNotifier(TG_BOT_TOKEN, TG_ADMIN_USER_ID)
notifier.send_message("🤖 私人追剧管家已启动,正在尝试登录...")

client = Cloud189()
try:
logger.info("189正在登录 ...")
client.login(ENV_189_CLIENT_ID, ENV_189_CLIENT_SECRET)
notifier.send_message("✅ 网盘登录成功!全天候仿生监控已就位。")
except Exception as e:
logger.error(f"登录失败: {e}")
notifier.send_message(f"❌ 首次登录失败: {e}\n(脚本将直接退出,请处理后重启)")
exit(-1)

main_control_loop(client)