- 作者:老汪软件技巧
- 发表时间:2024-09-17 15:03
- 浏览量:
自动化数据处理任务模板,你用就完了:
场景:下载数据,数据清洗,定时执行你司特定任务。
优化方面:增加数据验证,确保上传到 HDFS 的数据符合预期。增加重试机制,在某个步骤失败时自动重试。添加邮件通知,一旦流程完成或失败时,自动发送邮件通知。将任务调度交给 cron 或 Airflow,定期执行该任务。完整Shell脚本:增加数据验证、重试机制和邮件通知
#!/bin/bash
# ========================
# 配置部分
# ========================
REMOTE_SERVER="user@remote.server.com"
REMOTE_FILE_PATH="/path/to/remote/data.csv"
LOCAL_FILE_PATH="/path/to/local/data.csv"
HDFS_RAW_PATH="/user/hadoop/raw_data"
HDFS_CLEANED_PATH="/user/hadoop/cleaned_data"
HIVE_DB="data_analysis"
HIVE_TABLE="cleaned_data"
SPARK_APP_JAR="/path/to/spark-app.jar"
SPARK_MAIN_CLASS="com.example.DataAnalysisApp"
LOG_FILE="/path/to/logs/data_pipeline.log"
EMAIL_RECIPIENT="admin@example.com"
RETRY_LIMIT=3
# 日志记录函数
log() {
echo "$(date +"%Y-%m-%d %H:%M:%S") - $1" >> $LOG_FILE
}
# 错误处理函数
error_exit() {
log "ERROR: $1"
send_email "Data Pipeline Failed" "$1"
exit 1
}
# 邮件通知函数
send_email() {
SUBJECT=$1
BODY=$2
echo "$BODY" | mail -s "$SUBJECT" $EMAIL_RECIPIENT
}
# 重试机制
run_with_retries() {
local CMD="$1"
local RETRIES=0
local SLEEP_INTERVAL=5 # 重试间隔时间
until $CMD; do
((RETRIES++))
if [ $RETRIES -ge $RETRY_LIMIT ]; then
error_exit "Command failed after $RETRY_LIMIT attempts: $CMD"
fi
log "Command failed. Retrying in $SLEEP_INTERVAL seconds... (Attempt $RETRIES)"
sleep $SLEEP_INTERVAL
done
}
# 数据验证函数
validate_data() {
local FILE_PATH=$1
local MIN_FILE_SIZE=100 # 文件的最小大小,以字节为单位
if [ ! -f $FILE_PATH ]; then
error_exit "数据文件不存在: $FILE_PATH"
fi
local FILE_SIZE=$(stat -c%s "$FILE_PATH")
if [ $FILE_SIZE -lt $MIN_FILE_SIZE ]; then
error_exit "数据文件大小小于预期值: $FILE_SIZE 字节"
fi
log "数据文件验证成功: $FILE_PATH (大小: $FILE_SIZE 字节)"
}
# ========================
# 步骤 1: 从远程服务器下载原始数据
# ========================
log "步骤 1: 从远程服务器下载数据..."
run_with_retries "scp $REMOTE_SERVER:$REMOTE_FILE_PATH $LOCAL_FILE_PATH"
log "数据下载成功"
# 验证下载的数据文件
log "步骤 1.1: 验证数据文件..."
validate_data $LOCAL_FILE_PATH
# ========================
# 步骤 2: 将数据上传到 HDFS
# ========================
log "步骤 2: 上传数据到 HDFS..."
run_with_retries "hadoop fs -mkdir -p $HDFS_RAW_PATH && hadoop fs -put $LOCAL_FILE_PATH $HDFS_RAW_PATH"
log "数据上传到 HDFS 成功"
# ========================
# 步骤 3: 使用 Hive 进行数据清洗
# ========================
log "步骤 3: 使用 Hive 进行数据清洗..."
# 定义 Hive 清洗脚本
HIVE_CLEAN_QUERY="
USE $HIVE_DB;
CREATE TABLE IF NOT EXISTS $HIVE_TABLE AS
SELECT *
FROM raw_data
WHERE column1 IS NOT NULL AND column2 != '';
"
# 执行 Hive 清洗查询
run_with_retries "hive -e \"$HIVE_CLEAN_QUERY\""
log "Hive 数据清洗成功"
# ========================
# 步骤 4: 使用 Spark 进行数据分析
# ========================
log "步骤 4: 使用 Spark 进行数据分析..."
run_with_retries "spark-submit --class $SPARK_MAIN_CLASS --master yarn --deploy-mode cluster $SPARK_APP_JAR $HDFS_CLEANED_PATH"
log "Spark 数据分析任务成功完成"
# ========================
# 步骤 5: 清理临时文件并完成任务
# ========================
log "步骤 5: 清理临时文件..."
rm -f $LOCAL_FILE_PATH
log "临时文件清理完成"
# 发送成功的邮件通知
send_email "Data Pipeline Completed" "数据处理管道成功完成。"
log "数据处理管道成功完成"
exit 0
改进部分的解释
数据验证 (validate_data):
重试机制 (run_with_retries):
邮件通知 (send_email):
任务调度
可以使用 cron 或 Airflow 来调度这个任务,下面分别演示如何使用这两者进行定时调度。
1. 使用 cron 调度
cron 是 Linux 中的定时任务调度工具,配置方式简单,适合中小规模任务的调度。
设置每天凌晨 2 点运行该脚本:
# 打开cron编辑器
crontab -e
# 添加如下行,每天凌晨2点运行脚本
0 2 * * * /path/to/your/script.sh
2. 使用 Airflow 调度
Apache Airflow 是一个灵活的任务调度和工作流管理平台,适合更复杂的数据管道调度。
使用 Airflow 调度的步骤:安装并配置 Airflow(Airflow 需要配置 DAGs 文件)。创建一个 Airflow DAG 文件来调度该 Shell 脚本。Airflow 会通过 BashOperator 执行 Shell 脚本。
示例 DAG 文件:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'data_pipeline',
default_args=default_args,
description='定期运行数据处理管道',
schedule_interval='0 2 * * *', # 每天凌晨2点运行
start_date=datetime(2023, 1, 1),
catchup=False
)
run_pipeline = BashOperator(
task_id='run_data_pipeline',
bash_command='/path/to/your/script.sh',
dag=dag,
)
通过这种方式,可以轻松将任务交给 Airflow 定期执行,并且支持任务的监控、重试等高级功能。
总结:
这个改进的 Shell 脚本实现了:
数据验证:在上传到 HDFS 之前检查数据文件是否符合预期。重试机制:在任务失败时,脚本会自动重试,最多 3 次。邮件通知:脚本完成或失败时,会向相关人员发送邮件通知。任务调度:可以使用 cron 或 Airflow 定期调度任务。
这使得整个数据处理管道更加健壮、可维护,并且可以轻松应对常见的失败场景和错误处理。