Skip to content

Commit 096e3e1

Browse files
committed
每个表单独判断
1 parent 768c606 commit 096e3e1

File tree

2 files changed

+207
-84
lines changed

2 files changed

+207
-84
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# dump-import.md5.pv.sh 流程图
2+
3+
## 整体流程
4+
5+
```mermaid
6+
flowchart TD
7+
A[开始] --> B[环境变量初始化]
8+
B --> C{SSH连接验证}
9+
C -->|失败| D[退出]
10+
C -->|成功| E{首次运行?}
11+
E -->|是| F[初始化目录结构]
12+
E -->|否| G[继续执行]
13+
F --> G
14+
G --> H[启动CPU空闲率监控<br/>异步进程]
15+
H --> I[获取数据库列表]
16+
I --> J[循环处理每个数据库]
17+
```
18+
19+
## 数据库处理流程
20+
21+
```mermaid
22+
flowchart TD
23+
A[开始处理数据库] --> B{是否为系统库<br/>或忽略库?}
24+
B -->|是| C[跳过该库]
25+
B -->|否| D{并发库数量<br/>>= ASYNC_WAIT_DB_MAX?}
26+
D -->|是| E{远端mysqldump<br/>数量 < ASYNC_WAIT_DB_MAX?}
27+
E -->|是| F[等待确认后继续]
28+
E -->|否| G[等待2秒]
29+
G --> D
30+
D -->|否| H[获取该库的表列表]
31+
F --> H
32+
H --> I[循环处理每个表]
33+
I --> J[等待当前库所有表处理完成]
34+
J --> K[更新完成日志]
35+
K --> L{还有更多库?}
36+
L -->|是| A
37+
L -->|否| M[进入最终等待]
38+
```
39+
40+
## 表处理流程(并发)
41+
42+
```mermaid
43+
flowchart TD
44+
A[开始处理表] --> B{本地并发数<br/>< ASYNC_WAIT_MAX?}
45+
B -->|否| C[等待1秒]
46+
C --> B
47+
B -->|是| D[SSH远程执行表检查]
48+
D --> E{MYSQL_DATA_DIR<br/>存在?}
49+
E -->|是| F[检查表文件修改时间]
50+
E -->|否| G[直接导出表]
51+
F --> G
52+
G --> H[导出表数据<br/>计算MD5]
53+
H --> I{MD5文件存在?}
54+
I -->|否| J[标记为新表]
55+
I -->|是| K{MD5有差异<br/>或修改时间改变?}
56+
K -->|是| L[标记为差异表]
57+
K -->|否| M[无差异跳过]
58+
J --> N[异步导入表数据]
59+
L --> N
60+
M --> O[处理下一表]
61+
N --> O
62+
```
63+
64+
## 表导入流程
65+
66+
```mermaid
67+
flowchart TD
68+
A[检测到差异表] --> B[本地mysqldump导出]
69+
B --> C[pv限速传输]
70+
C --> D[导入到目标数据库]
71+
D --> E[记录导入结束时间]
72+
E --> F[继续处理下一个差异]
73+
```
74+
75+
## 最终清理流程
76+
77+
```mermaid
78+
flowchart TD
79+
A[所有库处理完成] --> B{检测mysqldump<br/>进程是否存在?}
80+
B -->|是| C[等待1秒]
81+
C --> B
82+
B -->|否| D[SSH远程清理]
83+
D --> E[移动temp目录到diff]
84+
E --> F[输出结束时间]
85+
F --> G[脚本结束]
86+
```
87+
88+
## 关键参数说明
89+
90+
| 参数 | 默认值 | 说明 |
91+
|------|--------|------|
92+
| `DB_PORT` | 3306 | 数据库端口 |
93+
| `ASYNC_WAIT_MAX` | 100 | 最大并发表数量 |
94+
| `ASYNC_WAIT_DB_MAX` | 10 | 最大并发库数量 |
95+
| `DUMP_PV` | 6m | pv限速 |
96+
| `DUMP_WAIT_SECONDS` | 0.6 | 表处理间隔秒数 |
97+
| `CPUQUOTA` | - | systemd CPU配额限制 |
98+
| `IONICE_C` | - | ionice IO优先级 |
99+
100+
## 并发控制策略
101+
102+
1. **库级别并发控制**: 同时处理的库数量不超过 `ASYNC_WAIT_DB_MAX`
103+
2. **表级别并发控制**: 同时运行的 mysqldump 进程不超过 `ASYNC_WAIT_MAX`
104+
3. **远端监控**: 实时监控远端 mysqldump 进程数量,避免过载
105+
4. **资源限制**: 支持 systemd CPU配额 和 ionice IO优先级 限制

mysqldump/tables-ssh/dump-import.md5.pv.sh

Lines changed: 102 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ ASYNC_WAIT_DB_MAX=${ASYNC_WAIT_DB_MAX:-10}
1717
DUMP_PV=${DUMP_PV:-6m}
1818
DUMP_WAIT_SECONDS=${DUMP_WAIT_SECONDS:-0.6}
1919

20+
# MySQL数据目录,用于加速判断表文件是否修改
21+
MYSQL_DATA_DIR=${MYSQL_DATA_DIR:-}
22+
2023
RUN_LIMIT_START=${RUN_LIMIT_START};
2124
RUN_LIMIT_START_MYSQLDUMP=${RUN_LIMIT_START_MYSQLDUMP};
2225
if [[ -n "$CPUQUOTA" ]];then
@@ -230,99 +233,114 @@ for ((i = 0; i < num_databases; i++)); do
230233
fi
231234
echo $db >> /tmp/databases_count.log;
232235

233-
234-
{
235-
echo "Dumping database: $db"
236-
# 空文件的md5=d41d8cd98f00b204e9800998ecf8427e
237-
command=$(cat << BASH
236+
# 先通过SSH获取表列表
237+
echo "获取数据库 $db 的表列表..."
238+
tables=$(echo "DB_PASS=\"${DB_PASS}\";mysql --user=\"${DB_USER}\" --port=\"${DB_PORT}\" --password=\"\${DB_PASS}\" --host=\"${DB_HOST}\" -N -e \"SHOW TABLES;\" $db 2>/dev/null" | eval "$sshRun 'bash -s'" | tr -d "| " | grep -v -e '^$')
239+
echo "获取到表列表: $tables"
240+
241+
# 转为数组
242+
tables_arr=();
243+
for table in $tables; do
244+
tables_arr+=($table)
245+
done
246+
num_tables=${#tables_arr[@]}
247+
echo "数据库 $db 表数量: $num_tables"
248+
249+
echo $db >> /tmp/databases_count.run.log;
250+
251+
# 本地循环每个表,并发处理
252+
for ((t = 0; t < num_tables; t++)); do
253+
table=${tables_arr[$t]}
254+
255+
# 并发控制:等待当前并发数小于最大值
256+
while true; do
257+
current_jobs=$(pgrep -f "mysqldump" | wc -l)
258+
current_jobs=$(( $current_jobs + 1 ))
259+
if [[ "$current_jobs" -lt "$ASYNC_WAIT_MAX" ]]; then
260+
break
261+
fi
262+
echo "$(date "+%Y-%m-%d %H:%M:%S")--等待并发槽位 $db.$table 当前jobs=$current_jobs ASYNC_WAIT_MAX=$ASYNC_WAIT_MAX"
263+
sleep 1
264+
done
265+
266+
{
267+
# 单个表的SSH判断脚本:同时检查修改时间和MD5
268+
table_command=$(cat << BASH
238269
DB_PASS="${DB_PASS}";
239-
echo '开始运行mysqldump $db';
270+
MYSQL_DATA_DIR="${MYSQL_DATA_DIR}";
271+
table="${table}";
240272
mkdir -p /tmp/dump-import-ssh-temp/$db/;
241-
lastTable='';
242-
$RUN_LIMIT_START_MYSQLDUMP mysqldump --no-tablespaces --log-error=/tmp/dump-import-ssh-temp/mysql_error_log_dir/$db.log --no-create-info --skip-comments --user="${DB_USER}" --port="${DB_PORT}" --password="\${DB_PASS}" --host="${DB_HOST}" $DUMP_ARGS $db | pv -L $DUMP_PV |grep -e '^INSERT INTO' | while read -r line; do
243-
table=\$(echo "\$line" | awk -F '\`' '{print \$2}');
244-
echo -n "\$line" | md5sum | awk -v table="\$table" '{print table, \$1}' >> /tmp/dump-import-ssh-temp/$db/\$table.md5;
245-
if [[ -z "\$lastTable" ]];then
246-
lastTable=\$table;
247-
echo \$lastTable > /tmp/dump-import-ssh-temp/$db.lastTable.log;
248-
fi;
249-
if [[ "\$lastTable" != "\$table" ]];then
250-
if [[ "\$(cat /tmp/dump-import-ssh-temp/$db/\$lastTable.md5 | md5sum)" != "\$(cat /tmp/dump-import-ssh-diff/$db/\$lastTable.md5 | md5sum)" ]];then
251-
echo '有差异表名 '\$lastTable;
273+
mkdir -p /tmp/dump-import-ssh-temp/mtime/$db/;
274+
275+
mtime_changed=0;
276+
277+
# 检查表文件修改时间是否改变(如果MYSQL_DATA_DIR存在)
278+
if [[ -n "\$MYSQL_DATA_DIR" && -d "\$MYSQL_DATA_DIR" ]]; then
279+
db_dir="\${MYSQL_DATA_DIR}/$db";
280+
if [[ -d "\$db_dir" ]]; then
281+
mtime_file="";
282+
if [[ -f "\${db_dir}/\${table}.ibd" ]]; then
283+
mtime_file="\${db_dir}/\${table}.ibd";
284+
elif [[ -f "\${db_dir}/\${table}.MYD" ]]; then
285+
mtime_file="\${db_dir}/\${table}.MYD";
286+
fi
287+
288+
if [[ -n "\$mtime_file" ]]; then
289+
current_mtime=\$(stat -c %Y "\$mtime_file" 2>/dev/null || stat -f %m "\$mtime_file" 2>/dev/null);
290+
stored_mtime=\$(cat /tmp/dump-import-ssh-diff/mtime/$db/\${table}.mtime 2>/dev/null || echo 0);
291+
echo "\$current_mtime" > /tmp/dump-import-ssh-temp/mtime/$db/\${table}.mtime;
292+
293+
if [[ "\$current_mtime" != "\$stored_mtime" ]]; then
294+
mtime_changed=1;
295+
fi
252296
fi
253-
lastTable=\$table;
254-
echo \$lastTable > /tmp/dump-import-ssh-temp/$db.lastTable.log;
255-
fi;
256-
done;
257-
258-
lastTable=\$(cat /tmp/dump-import-ssh-temp/$db.lastTable.log);
259-
if [[ -n "\$lastTable" ]];then
260-
if [[ -f "/tmp/dump-import-ssh-diff/$db/\$lastTable.md5" ]];then
261-
if [[ "\$(cat /tmp/dump-import-ssh-temp/$db/\$lastTable.md5 | md5sum)" != "\$(cat /tmp/dump-import-ssh-diff/$db/\$lastTable.md5 | md5sum)" ]];then
262-
echo '有差异表名 '\$lastTable;
263-
fi;
264-
else
265-
echo '有差异表名 '\$lastTable;
266297
fi
267298
fi
268299
269-
echo '结束运行mysqldump $db';
300+
# 导出表并计算MD5
301+
$RUN_LIMIT_START_MYSQLDUMP mysqldump --no-tablespaces --log-error=/tmp/dump-import-ssh-temp/mysql_error_log_dir/$db-\${table}.log --no-create-info --skip-comments --user="${DB_USER}" --port="${DB_PORT}" --password="\${DB_PASS}" --host="${DB_HOST}" $DUMP_ARGS $db \$table 2>/dev/null | grep -e '^INSERT INTO' | md5sum | awk -v table="\$table" '{print table, \$1}' > /tmp/dump-import-ssh-temp/$db/\$table.md5;
270302
271-
BASH
272-
)
273-
274-
# printf "%s\n" "$command"
275-
276-
# todo 还要管道导出表
277-
# ionice -c3
278-
time (
279-
# printf "%s\n" "$command" | eval "$sshRun 'exec -a \"导出数据库-$db\" bash -s 2> /dev/null'" | while read -r line; do
280-
# printf "%s\n" "$command" | eval "$sshRun 'ionice -c3 bash -s 2> /dev/null'" | while read -r line; do
281-
printf "%s\n" "$command" | eval "$sshRun '$RUN_LIMIT_START bash -c \"exec -a 导出数据库-$db bash -s\" 2> /dev/null'" | while read -r line; do
282-
echo $db.$line;
283-
if [[ $line = "开始运行mysqldump $db" ]];then
284-
echo $db >> /tmp/databases_count.run.log;
285-
continue;
286-
fi
287-
if [[ $line = "结束运行mysqldump $db" ]];then
288-
echo $db >> /tmp/databases_count.end.log;
289-
continue;
290-
fi
291-
292-
table=$(echo $line | awk '{print $2}')
293-
current_jobs=$(pgrep -f "mysqldump" | wc -l)
294-
current_jobs=$(( $current_jobs + 1 ))
295-
296-
if [[ "$current_jobs" -lt "$ASYNC_WAIT_MAX" ]]; then
297-
{
298-
echo "进程数小于最大等待数,异步导入--$db.$table";
299-
time (mysqldump --no-tablespaces --user="${DB_USER}" --port="${DB_TABLE_PORT}" --password="${DB_PASS}" --host="${DB_TABLE_HOST}" $DUMP_ARGS $db "$table" | pv -L $DUMP_PV | mysql --user="${IMPORT_DB_USER}" --password="${IMPORT_DB_PASS}" --host="${IMPORT_DB_HOST}" $IMPORT_ARGS "$db")
300-
echo $(date "+%Y-%m-%d %H:%M:%S")"--导入结束 $db.$table";
301-
} &
302-
sleep $DUMP_WAIT_SECONDS;
303-
else
304-
echo "进程数大于最大等待数,同步等待导入-$db.$table";
305-
# sleep 20;
306-
time (mysqldump --no-tablespaces --user="${DB_USER}" --port="${DB_TABLE_PORT}" --password="${DB_PASS}" --host="${DB_TABLE_HOST}" $DUMP_ARGS $db "$table" | pv -L $DUMP_PV | mysql --user="${IMPORT_DB_USER}" --password="${IMPORT_DB_PASS}" --host="${IMPORT_DB_HOST}" $IMPORT_ARGS "$db")
307-
echo $(date "+%Y-%m-%d %H:%M:%S")"--导入结束 $db.$table";
308-
fi
309-
310-
done;
311-
)
312-
# time (eval "$sshRun 'bash -s'" < $command)
313-
314-
315-
sed -i "/$db/d" /tmp/databases_count.log;
316-
# sed -i "/$db/d" /tmp/databases_count.run.log;
317-
318-
319-
echo $(date "+%Y-%m-%d %H:%M:%S")"--异步导出库--结束--$db----continueBool=$continueBool--------------------------------------";
303+
# 判断是否有差异
304+
temp_md5=\$(cat /tmp/dump-import-ssh-temp/$db/\$table.md5);
320305
306+
if [[ -f "/tmp/dump-import-ssh-diff/$db/\$table.md5" ]]; then
307+
diff_md5=\$(cat /tmp/dump-import-ssh-diff/$db/\$table.md5);
308+
if [[ "\$temp_md5" != "\$diff_md5" ]]; then
309+
echo "diff-sync 有差异表名 \$table";
310+
elif [[ "\$mtime_changed" == "1" ]]; then
311+
echo "diff-sync 文件修改时间改变 \$table";
312+
fi
313+
else
314+
echo "diff-sync 新表 \$table";
315+
fi
321316
322-
#
323-
# }
324-
} &
325-
sleep 2;
317+
BASH
318+
)
319+
320+
# 执行SSH并处理输出
321+
printf "%s\n" "$table_command" | eval "$sshRun '$RUN_LIMIT_START bash -c \"exec -a 导出表-$db.$table bash -s\" 2> /dev/null'" | while read -r line; do
322+
echo "差异判断输出: " . $db.$line;
323+
324+
# 只处理有差异的表
325+
if [[ $line == "diff-sync "* ]]; then
326+
import_table=$(echo $line | awk '{print $3}')
327+
328+
echo "进程数小于最大等待数,异步导入--$db.$import_table";
329+
time (mysqldump --no-tablespaces --user="${DB_USER}" --port="${DB_TABLE_PORT}" --password="${DB_PASS}" --host="${DB_TABLE_HOST}" $DUMP_ARGS $db "$import_table" | pv -L $DUMP_PV | mysql --user="${IMPORT_DB_USER}" --password="${IMPORT_DB_PASS}" --host="${IMPORT_DB_HOST}" $IMPORT_ARGS "$db") || echo "导入失败: $db.$import_table"
330+
echo $(date "+%Y-%m-%d %H:%M:%S")"--导入结束 $db.$import_table";
331+
fi
332+
done
333+
} &
334+
sleep $DUMP_WAIT_SECONDS;
335+
done
336+
337+
# 等待当前库的所有表处理完成
338+
wait
339+
340+
echo $db >> /tmp/databases_count.end.log;
341+
sed -i "/$db/d" /tmp/databases_count.log;
342+
343+
echo $(date "+%Y-%m-%d %H:%M:%S")"--异步导出库--结束--$db--------------------------------------";
326344
# exit;
327345
fi
328346
done

0 commit comments

Comments
 (0)