• 作者:老汪软件技巧
  • 发表时间:2024-09-17 15:03
  • 浏览量:

自动化数据处理任务模板,你用就完了:

场景:下载数据,数据清洗,定时执行你司特定任务。

优化方面:增加数据验证,确保上传到 HDFS 的数据符合预期。增加重试机制,在某个步骤失败时自动重试。添加邮件通知,一旦流程完成或失败时,自动发送邮件通知。将任务调度交给 cron 或 Airflow,定期执行该任务。完整Shell脚本:增加数据验证、重试机制和邮件通知

#!/bin/bash
# ========================
# 配置部分
# ========================
REMOTE_SERVER="user@remote.server.com"
REMOTE_FILE_PATH="/path/to/remote/data.csv"
LOCAL_FILE_PATH="/path/to/local/data.csv"
HDFS_RAW_PATH="/user/hadoop/raw_data"
HDFS_CLEANED_PATH="/user/hadoop/cleaned_data"
HIVE_DB="data_analysis"
HIVE_TABLE="cleaned_data"
SPARK_APP_JAR="/path/to/spark-app.jar"
SPARK_MAIN_CLASS="com.example.DataAnalysisApp"
LOG_FILE="/path/to/logs/data_pipeline.log"
EMAIL_RECIPIENT="admin@example.com"
RETRY_LIMIT=3
# 日志记录函数
log() {
    echo "$(date +"%Y-%m-%d %H:%M:%S") - $1" >> $LOG_FILE
}
# 错误处理函数
error_exit() {
    log "ERROR: $1"
    send_email "Data Pipeline Failed" "$1"
    exit 1
}
# 邮件通知函数
send_email() {
    SUBJECT=$1
    BODY=$2
    echo "$BODY" | mail -s "$SUBJECT" $EMAIL_RECIPIENT
}
# 重试机制
run_with_retries() {
    local CMD="$1"
    local RETRIES=0
    local SLEEP_INTERVAL=5  # 重试间隔时间
    until $CMD; do
        ((RETRIES++))
        if [ $RETRIES -ge $RETRY_LIMIT ]; then
            error_exit "Command failed after $RETRY_LIMIT attempts: $CMD"
        fi
        log "Command failed. Retrying in $SLEEP_INTERVAL seconds... (Attempt $RETRIES)"
        sleep $SLEEP_INTERVAL
    done
}
# 数据验证函数
validate_data() {
    local FILE_PATH=$1
    local MIN_FILE_SIZE=100  # 文件的最小大小,以字节为单位
    if [ ! -f $FILE_PATH ]; then
        error_exit "数据文件不存在: $FILE_PATH"
    fi
    local FILE_SIZE=$(stat -c%s "$FILE_PATH")
    if [ $FILE_SIZE -lt $MIN_FILE_SIZE ]; then
        error_exit "数据文件大小小于预期值: $FILE_SIZE 字节"
    fi
    log "数据文件验证成功: $FILE_PATH (大小: $FILE_SIZE 字节)"
}
# ========================
# 步骤 1: 从远程服务器下载原始数据
# ========================
log "步骤 1: 从远程服务器下载数据..."
run_with_retries "scp $REMOTE_SERVER:$REMOTE_FILE_PATH $LOCAL_FILE_PATH"
log "数据下载成功"
# 验证下载的数据文件
log "步骤 1.1: 验证数据文件..."
validate_data $LOCAL_FILE_PATH
# ========================
# 步骤 2: 将数据上传到 HDFS
# ========================
log "步骤 2: 上传数据到 HDFS..."
run_with_retries "hadoop fs -mkdir -p $HDFS_RAW_PATH && hadoop fs -put $LOCAL_FILE_PATH $HDFS_RAW_PATH"
log "数据上传到 HDFS 成功"
# ========================
# 步骤 3: 使用 Hive 进行数据清洗
# ========================
log "步骤 3: 使用 Hive 进行数据清洗..."
# 定义 Hive 清洗脚本
HIVE_CLEAN_QUERY="
  USE $HIVE_DB;
  CREATE TABLE IF NOT EXISTS $HIVE_TABLE AS
  SELECT *
  FROM raw_data
  WHERE column1 IS NOT NULL AND column2 != '';
"
# 执行 Hive 清洗查询
run_with_retries "hive -e \"$HIVE_CLEAN_QUERY\""
log "Hive 数据清洗成功"
# ========================
# 步骤 4: 使用 Spark 进行数据分析
# ========================
log "步骤 4: 使用 Spark 进行数据分析..."
run_with_retries "spark-submit --class $SPARK_MAIN_CLASS --master yarn --deploy-mode cluster $SPARK_APP_JAR $HDFS_CLEANED_PATH"
log "Spark 数据分析任务成功完成"
# ========================
# 步骤 5: 清理临时文件并完成任务
# ========================
log "步骤 5: 清理临时文件..."
rm -f $LOCAL_FILE_PATH
log "临时文件清理完成"
# 发送成功的邮件通知
send_email "Data Pipeline Completed" "数据处理管道成功完成。"
log "数据处理管道成功完成"
exit 0

改进部分的解释

数据验证 (validate_data):

重试机制 (run_with_retries):

邮件通知 (send_email):

任务调度

可以使用 cron 或 Airflow 来调度这个任务,下面分别演示如何使用这两者进行定时调度。

1. 使用 cron 调度

cron 是 Linux 中的定时任务调度工具,配置方式简单,适合中小规模任务的调度。

设置每天凌晨 2 点运行该脚本:

# 打开cron编辑器
crontab -e
# 添加如下行,每天凌晨2点运行脚本
0 2 * * * /path/to/your/script.sh

2. 使用 Airflow 调度

Apache Airflow 是一个灵活的任务调度和工作流管理平台,适合更复杂的数据管道调度。

使用 Airflow 调度的步骤:安装并配置 Airflow(Airflow 需要配置 DAGs 文件)。创建一个 Airflow DAG 文件来调度该 Shell 脚本。Airflow 会通过 BashOperator 执行 Shell 脚本。

示例 DAG 文件:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
    'owner': 'airflow',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'data_pipeline',
    default_args=default_args,
    description='定期运行数据处理管道',
    schedule_interval='0 2 * * *',  # 每天凌晨2点运行
    start_date=datetime(2023, 1, 1),
    catchup=False
)
run_pipeline = BashOperator(
    task_id='run_data_pipeline',
    bash_command='/path/to/your/script.sh',
    dag=dag,
)

通过这种方式,可以轻松将任务交给 Airflow 定期执行,并且支持任务的监控、重试等高级功能。

总结:

这个改进的 Shell 脚本实现了:

数据验证:在上传到 HDFS 之前检查数据文件是否符合预期。重试机制:在任务失败时,脚本会自动重试,最多 3 次。邮件通知:脚本完成或失败时,会向相关人员发送邮件通知。任务调度:可以使用 cron 或 Airflow 定期调度任务。

这使得整个数据处理管道更加健壮、可维护,并且可以轻松应对常见的失败场景和错误处理。