• 作者:老汪软件技巧
  • 发表时间:2024-08-30 15:01
  • 浏览量:

本文简要介绍kubeflow,以及他的部署使用方式。最近在调研kubeflow平台,发现网上资料较少,加上它版本迭代较快有些调用使用方式变化,由于工作内容相关跑通了一些他的主要功能如katib参数调优,kubeflow pipline搭建,多用户创建等具体案例。

kubeflow介绍

Kubeflow 是一个专为在 Kubernetes 上部署、管理和扩展机器学习(ML)工作流而设计的开源平台,旨在简化机器学习项目在生产环境中的部署和操作。基于Kubernetes的容器编排和资源管理功能。通过将机器学习工作流拆分为一系列的容器化任务,Kubeflow可以利用Kubernetes的自动扩展、容错和调度功能,确保机器学习任务的高效执行,Kubeflow 提供了一套完整的工具和服务,支持从数据准备、模型训练、调优到部署的整个机器学习生命周期。这种端到端的解决方案能够帮助数据科学家和工程师快速实现从开发到生产的转换。

整体来说kubeflow有以下组件构成

Kubeflow 中央控制面板为访问 Kubeflow 及其生态系统组件提供了一个经过验证的 Web 界面。作为一个集中式中心,它聚合了集群内各种工具和服务的用户界面,为管理机器学习平台提供了一个统一的接入点。Kubeflow 与Jupyter Notebooks集成,为数据探索、实验和模型开发提供了一个交互式环境。Notebooks 支持各种编程语言,包括 Python、R 和 Scala,允许用户以协作且可再现的方式创建和执行 ML 工作流。Kubeflow Pipelines让用户能够以有向无环图(DAG)的形式定义和执行复杂的 ML 工作流。Kubeflow Pipelines 提供了一种方法,可编排并自动执行数据预处理、模型训练、评估和部署的端到端流程,从而促进了 ML 项目的可重现性、可扩展性和协作性。Kubeflow PipelinesSDK是一组 Python 软件包,允许用户精确而高效地定义和执行机器学习工作流。Kubeflow Training Operator为大规模训练机器学习模型提供了工具。这包括支持使用 TensorFlow、PyTorch 和 XGBoost 等框架进行分布式训练。用户可以利用 Kubernetes 的可扩展性和资源管理功能,跨机器集群高效地训练模型。Kubeflow Serving支持用户将经过训练的 ML 模型部署为可扩展的生产就绪型服务。它为使用 TensorFlow Serving、Seldon Core 等流行框架或自定义推理服务器部署模型提供了一致的界面。模型可在实时或批处理场景中部署,通过 HTTP 端点提供预测。Kubeflow Metadata是一个集中式存储库,用于跟踪和管理与 ML 实验、运行和工件相关的元数据。它为整个工作流提供了一致的 ML 元数据视图,可在 ML 项目中实现可重现性、协作和治理。环境准备

部署kubelfow的前提是有一个集群,我使用的是1.26的kubernetes,用kubeadm安装部署。对应的kubeflow版本为1.8.1

NFS安装

由于安装部署kubeflow的要求是需要有一个default的storage class所以现在给出如何安装部署使用nfs文件系统与nfs-subdir-external-provisioner。

在官方文档中,使用kubeflow mainfest安装的需求是有默认的storageclass,kustomize,与kubectl工具。由于要使用默认的storage class因此部署使用nfs-subdir-external-provisioner。可以自动的创建和管理pv,pvc关系。因此介绍下对应的nfs,nfs插件的安装部署流程。

通过命令apt install -y nfs-kernel-server安装下载nfs服务。

创建一个数据存储目录 mkdir -p /data/redis。

修改/etc/exports文件下的内容,添加新的配置目录/data/redis 192.168.0.0/24(rw,sync,no_all_squash,no_subtree_check,no_root_squash)

完成后重启nfs服务

部署NFS-Subdir-External-Provisioner

NFS-Subdir-External-Provisioner是一个自动配置卷程序,它使用现有的和已配置的 NFS 服务器来支持通过持久卷声明动态配置 Kubernetes 持久卷。在部署使用NFS-subdir服务前你需要一个有nfs服务的节点,并且知道对应的存储目录,在上述示例中他所对应的节点和目录为192.168.0.208,/data/redis

apiVersion: v1
kind: ServiceAccount
metadata:
  name: nfs-client-provisioner
  namespace: default _#_ _替换成你要部署的_ _Namespace_
---

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: nfs-client-provisioner-runner
rules:
  - apiGroups: [""]
    resources: ["persistentvolumes"]
    verbs: ["get", "list", "watch", "create", "delete"]
  - apiGroups: [""]
    resources: ["persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "update"]
  - apiGroups: ["storage.k8s.io"]
    resources: ["storageclasses"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "update", "patch"]
---

kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-nfs-client-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    namespace: default
roleRef:
  kind: ClusterRole
  name: nfs-client-provisioner-runner
  apiGroup: rbac.authorization.k8s.io
---

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  namespace: default
rules:
  - apiGroups: [""]
    resources: ["endpoints"]
    verbs: ["get", "list", "watch", "create", "update", "patch"]
---

kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  namespace: default
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    namespace: default
roleRef:
  kind: Role
  name: leader-locking-nfs-client-provisioner
  apiGroup: rbac.authorization.k8s.io

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nfs-client-provisioner
  labels:
    app: nfs-client-provisioner
spec:
  replicas: 1
  strategy:
    type: Recreate                   _##_ _设置升级策略为删除再创建__(__默认为滚动更新__)_
  selector:
    matchLabels:
      app: nfs-client-provisioner
  template:
    metadata:
      labels:
        app: nfs-client-provisioner
    spec:
      serviceAccountName: nfs-client-provisioner
      containers:
        - name: nfs-client-provisioner
          _#image: gcr.io/k8s-staging-sig-storage/nfs-subdir-external-provisioner:v4.0.0_
          image: registry.cn-beijing.aliyuncs.com/xngczl/nfs-subdir-external-provisione:v4.0.0
          volumeMounts:
            - name: nfs-client-root
              mountPath: /persistentvolumes
          env:
            - name: PROVISIONER_NAME     _## Provisioner__的名称__,__以后设置的__storageclass__要和这个保持一致_
              value: nfs-client
            - name: NFS_SERVER           _## NFS__服务器地址__,__需和__valumes__参数中配置的保持一致_
              value: 192.168.0.208
            - name: NFS_PATH             _## NFS__服务器数据存储目录__,__需和__valumes__参数中配置的保持一致_
              value: /data/redis
      volumes:
        - name: nfs-client-root
          nfs:
            server: 192.168.0.208    _## NFS__服务器地址_
            path: /data/redis           _## NFS__服务器数据存储目录_

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: nfs-storage
  annotations:
    storageclass.kubernetes.io/is-default-class: "true"  _##_ _是否设置为默认的__storageclass_
provisioner: nfs-client                                   _##_ _动态卷分配者名称,必须和上面创建的__"provisioner"__变量中设置的__Name__一致_
parameters:
  archiveOnDelete: "true"                                 _##_ _设置为__"false"__时删除__PVC__不会保留数据__,"true"__则保留数据_

之后kubectl apply -f 上述几个文件。同时给NFS的存储目录加上权限chmod 777 /data/redis

安装kubeflow

在官方gitbub仓库的kubeflow manifest项目中下载对应的二进制文件安装包skustomize_v5.0.3_linux_amd64.tar.gz,解压到/usr/local/bin目录下。下载官方kubeflow maiinfest项目包mainfests-1.8.1.tar.gz

修改默认存储StorageClass

修改yaml,下面每个文件里面添加 storageClassName: nfs-storage,在mainfests-1.8.1目录下的

apps/katib/upstream/components/mysql/pvc.yaml

common/oidc-client/oidc-authservice/base/pvc.yaml

apps/pipeline/upstream/third-party/minio/base/minio-pvc.yaml

apps/pipeline/upstream/third-party/mysql/base/mysql-pv-claim.yaml

修改镜像拉取策略

由于部分组件的镜像拉取策略为Always,所以修改他们为IfNotPresent,在当前目录下执行命令。find ./ -type f -exec grep -l "imagePullPolicy: Always" {} ;查找到所有关于imagePullPolicy为Always的文件。在这一步可以不修改,**在后续部署pod的过程中如果有pod所在节点存在镜像但是还是无法拉取镜像并且运行的话,就修改对应服务的镜像拉取策略。

root@master:~/huhu/kubeflow/manifests-1.8.1# find ./ -type f -exec grep -l "imagePullPolicy: Always" {} \;
./docs/KustomizeBestPractices.md
./contrib/kserve/models-web-app/base/deployment.yaml
./contrib/kserve/kserve/kserve.yaml
./contrib/kserve/kserve/kserve_kubeflow.yaml
./apps/kfp-tekton/upstream/third-party/tekton-custom-task/driver-controller/500-controller.yaml
./apps/kfp-tekton/upstream/third-party/kfp-csi-s3/csi-s3-deployment.yaml
./apps/kfp-tekton/upstream/v1/third-party/kfp-csi-s3/csi-s3-deployment.yaml
./apps/kfp-tekton/upstream/v1/base/cache-deployer/cache-deployer-deployment.yaml
./apps/kfp-tekton/upstream/v1/base/cache/cache-deployment.yaml
./apps/kfp-tekton/upstream/v1/base/pipeline/ml-pipeline-apiserver-deployment.yaml
./apps/kfp-tekton/upstream/v1/base/pipeline/ml-pipeline-viewer-crd-deployment.yaml
./apps/kfp-tekton/upstream/base/cache-deployer/cache-deployer-deployment.yaml
./apps/kfp-tekton/upstream/base/cache/cache-deployment.yaml
./apps/kfp-tekton/upstream/base/pipeline/ml-pipeline-viewer-crd-deployment.yaml
./apps/pipeline/upstream/base/cache-deployer/cache-deployer-deployment.yaml
./apps/pipeline/upstream/base/cache/cache-deployment.yaml
./apps/pipeline/upstream/base/pipeline/ml-pipeline-viewer-crd-deployment.yaml
./common/oidc-client/oidc-authservice/base/statefulset.yaml

修改APP_SECURE_COOKIES

修改对应配置文件将APP_SECURE_COOKIES的值设置为false表示不使用加密cookies交互。Vim ./apps/jupyter/jupyter-web-app/upstream/base/params.env

将其修改为false,否则部署起来后无法通过dashbord访问kubeflow。同样可以执行find ./ -type f -exec grep -l "APP_SECURE_COOKIES" {} ;命令来查看是否还有其他需要修改的APP_SECURE_COOKIES=false的配置,在我的部署过程中目前看来只需要修改jwp的即可

部署kubeflow

在修改完上述对应的配置后,在mainfest目录下执行命令如下可以自动的部署kubeflow,由于它所依赖的组件过多,因此安装过程中大概率会出现问题,上述的配置应该能解决大部分遇到的问题,但是如果遇到安装失败,pod无法正常Running请自行排查解决。

while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done

等待pod全部running,期间会出现错误,逐步排查解决对应的pod问题,大多都是镜像无法拉取,可以手动拉取并且分发上传到对应节点上。同时看需求修改对应deployment,statusfulset的镜像拉取策略为IfNotPresent,就可以修复异常pod。

镜像拉取失败

由于国内使用网络环境的问题,在拉取大部分的镜像的时候可能都会出现失败的情况,因此需要自行解决镜像拉取的问题,如可以使用代理的方式手动下载所需的镜像。下面给出一个可以使用的脚本用于将master节点所缺少的镜像同步下载分发到不同节点。前提是需要sshpass工具。并且节点配置了代理。该工具通过查看所有pod不是running状态并且所需的镜像名字,通过脚本的方式对所需镜像进行下载和打包转发到其他node节点。在脚本中需要配置workernode和用户登录的账户密码。

#!/bin/bash
# 定义节点IP
WORKER_NODES=("192.168.0.xxx" "192.168.0.xxx")
# 登录凭证
USER="xxx"
PASS="xxx"
# 检查是否安装了sshpass
if ! command -v sshpass &> /dev/null
then
    echo "sshpass could not be found. Please install it first."
    exit 1
fi
# 函数:显示进度条
show_progress() {
    local duration=$1
    local sleep_interval=0.1
    local progress=0
    local bar_size=40
    while [ $progress -lt 100 ]; do
        local num_bars=$((progress * bar_size / 100))
        printf "\r[%-${bar_size}s] %d%%" $(printf "#%.0s" $(seq 1 $num_bars)) $progress
        progress=$((progress + 1))
        sleep $sleep_interval
    done
    echo
}
echo "==============="
echo "检查需要下载的镜像"
echo "==============="
# 获取所有Pod状态
pods=$(kubectl get po -A -o wide)
# 提取处于ImagePullBackOff状态的Pod的镜像
failed_pods=$(echo "$pods" | awk '$4 ~ /ImagePullBackOff/ {print $2","$1}')
failed_images=""
for pod_info in $failed_pods
do
    IFS=',' read -r pod namespace <<< "$pod_info"
    images=$(kubectl get pod $pod -n $namespace -o jsonpath='{.spec.containers[*].image}')
    failed_images="$failed_images $images"
done
failed_images=$(echo $failed_images | tr ' ' '\n' | sort | uniq)
  
# 检查并添加缺失的标签
corrected_images=""
for image in $failed_images; do
    if [[ "$image" != *:* ]]; then
        # 如果镜像没有标签,默认添加 :latest
        image="${image}:latest"
    fi
    corrected_images="$corrected_images $image"
done
  
if [ -z "$corrected_images" ]; then
    echo "没有找到需要下载的镜像。"
    exit 0
fi
echo "列出所需的镜像:"
echo "$corrected_images"
echo
echo "==============="
echo "下载所需镜像"
echo "==============="
# 在主节点上下载镜像
for image in $corrected_images
do
    if [ -f "/tmp/${image##*/}.tar" ]; then
        echo "跳过下载镜像 $image, 因为它已经存在于/tmp目录中"
    else
        # 检查镜像是否包含 .io 域名
        if [[ "$image" == *".io"* ]]; then
            prefixed_image="$image"
        else
            echo "镜像 $image 不包含前缀,尝试添加 docker.io"
            prefixed_image="docker.io/$image"
        fi
  
        echo "正在拉取镜像 $prefixed_image"
        ctr -n k8s.io i pull "$prefixed_image"
        if [ $? -eq 0 ]; then
            echo "导出镜像 $prefixed_image"
            ctr -n k8s.io i export /tmp/${prefixed_image##*/}.tar "$prefixed_image"
            show_progress 2
        else
            if [[ "$image" != *".io"* ]]; then
                echo "尝试使用 gcr.io 前缀拉取镜像 $image"
                prefixed_image="gcr.io/$image"
                ctr -n k8s.io i pull "$prefixed_image"
                if [ $? -eq 0 ]; then
                    echo "导出镜像 $prefixed_image"
                    ctr -n k8s.io i export /tmp/${prefixed_image##*/}.tar "$prefixed_image"
                    show_progress 2
                else
                    echo "拉取镜像 $image 失败, 跳过"
                fi
            else
                echo "拉取镜像 $image 失败, 跳过"
            fi
        fi
    fi
done
for i in "${!WORKER_NODES[@]}"; do
    worker=${WORKER_NODES[$i]}
    node_num=$((i+1))
    echo "==========="
    echo "传输到 node${node_num} 上 (${worker})"
    echo "==========="
    for image in $corrected_images
    do
        if [ -f "/tmp/${image##*/}.tar" ]; then
            echo "传输镜像 ${image##*/} 到 node${node_num}"
            sshpass -p ${PASS} scp -o StrictHostKeyChecking=no /tmp/${image##*/}.tar ${USER}@${worker}:/tmp/
            if [ $? -eq 0 ]; then
                show_progress 3
                echo "在 node${node_num} 上导入镜像 ${image##*/}"
                sshpass -p ${PASS} ssh -o StrictHostKeyChecking=no ${USER}@${worker} "ctr -n k8s.io i import /tmp/${image##*/}.tar && rm /tmp/${image##*/}.tar"
                show_progress 2
            else
                echo "传输镜像 ${image##*/} 到 node${node_num} 失败, 跳过"
            fi
        else
            echo "跳过传输镜像 ${image##*/} 到 node${node_num}, 因为它不在/tmp目录中"
        fi
    done
done
echo "镜像拉取和分发完成。"

修改svc为nodeport模式

由于部署起来的svc全都是cluser ip的类型所以无法直接被外部访问,因此需要手动修改istio-ingressgateway的svc为nodeport类型

k edit svc -n istio-system istio-ingressgateway

通过修改后暴露出来的端口进行访问默认的登录账号密码为12341234

使用kubeflow

这里按照模块介绍下 Kubeflow 的几个核心组件。

模型开发-Notebook

Kubeflow Notebooks 是 Kubeflow 平台中一个关键的组件,它为数据科学家和机器学习工程师提供了在 Kubernetes 上运行 Jupyter Notebooks 的能力。这一功能的出现极大地简化了在云环境中管理和使用 Jupyter Notebook 的复杂性。通过 Kubeflow Notebooks,用户能够在 Kubernetes 集群中轻松创建和管理多个 Jupyter Notebook 实例。这些实例可以针对特定用户进行资源配置,如 CPU、内存和 GPU,以确保在多用户环境中能够实现高效的资源隔离和使用

通过notebook新建一个task

在配置页面可以设置名你在,对应的cpu,gpu的调配,创建好之后会出现对应的pod。关于gpu的使用可以参考官方文档的介绍/docs/compon…

点击connect链接到对应的pod中,在这一步会遇到镜像拉取失败的问题,需要手动拉取镜像到指定节点等待pod运行起来。

**ctr -n k8s.io i pull xxx**

模型训练

在创建notebook的时候可以进行镜像的选择,在这里我们选择带有tensorflow的镜像,就可以直接在里面使用对应的框架。同时还有不同的如pytorch-cuda镜像等提供。创建好后通过如下的示例来跑一个简单的训练代码。

import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout, Conv2D, MaxPooling2D
_#_ _加载__MNIST__数据集_
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
_#_ _预处理数据:调整形状并归一化_
train_images = train_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0
test_images = test_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0
_#_ _将标签转换为_ _one-hot_ _编码_
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)
_#_ _构建模型_
model = Sequential([
    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    MaxPooling2D(pool_size=(2, 2)),
    Conv2D(64, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(10, activation='softmax')
])
_#_ _编译模型_
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy'])
_#_ _训练模型_
model.fit(train_images, train_labels, epochs=10, batch_size=64, validation_data=(test_images, test_labels))
_#_ _评估模型_
test_loss, test_accuracy = model.evaluate(test_images, test_labels, verbose=0)
print(f"Test accuracy: {test_accuracy:.4f}")
# 保存模型
model.save('mnist_cnn_model.keras')
print("模型保存成功")

运行后得到结果

在新的jupyter文件中通过import模型来导入。使用自己手写的图片来进行结果的预测。

import numpy as np
import tensorflow as tf
from PIL import Image, ImageOps, ImageFilter
import matplotlib.pyplot as plt
_#_ _加载保存的模型_
loaded_model = tf.keras.models.load_model('mnist_cnn_model.keras')
定义你自己的图片文件名列表
image_files = [
    'mnist_test_0_label_9.png',
    'mnist_test_2_label_8.png',
    'mnist_test_4_label_2.png',
    'mnist_test_1_label_0.png',
    'mnist_test_3_label_7.png'
]
_#image_files = ['__图片__.jpg', '123.jpg', 'third.jpg', '9.jpg']_
def preprocess_image(img):
    _#_ _转换为灰度图像_
    img = img.convert('L')
    _#_ _自动对比度增强_
    img = ImageOps.autocontrast(img)
    _#_ _裁剪数字的边缘并居中_
    img = img.crop(img.getbbox())  _#_ _裁剪非空白区域_
    img = img.resize((20, 20), Image.Resampling.LANCZOS)  _#_ _调整图像大小,保持最大信息_
    background = Image.new('L', (28, 28), 0)  _#_ _创建黑色背景_
    offset = ((28 - img.size[0]) // 2, (28 - img.size[1]) // 2)
    background.paste(img, offset)  _#_ _将图像粘贴到背景上使其居中_
    return background
_#_ _创建一个图形,包含__5__行__2__列的子图_
fig, axs = plt.subplots(5, 2, figsize=(10, 25))
_#fig, axs = plt.subplots(4, 2, figsize=(10, 25))_
for i, file in enumerate(image_files):
    _#_ _加载图像_
    img = Image.open(file)
    _#_ _对图像进行预处理_
    processed_img = preprocess_image(img)
    _#_ _将图像转换为数组并进行标准化,确保形状为_ _(28, 28, 1)_
    img_array = np.array(processed_img).reshape(1, 28, 28, 1).astype('float32') / 255.0
    _#_ _进行预测_
    predictions = loaded_model.predict(img_array)
    _#_ _获取预测结果_
    predicted_digit = np.argmax(predictions[0])
    _#_ _显示原始图片_
    axs[i, 0].imshow(img, cmap='gray')
    axs[i, 0].set_title(f'原始图片: {file}')
    axs[i, 0].axis('off')
    _#_ _显示处理后的图片_
    axs[i, 1].imshow(processed_img, cmap='gray')
    axs[i, 1].set_title(f'预测结果: {predicted_digit}')
    axs[i, 1].axis('off')
    print(f"{file} 预测的数字是: {predicted_digit}")
plt.tight_layout()
plt.show()

GPU训练

使用gpu镜像会要求我们的集群中存在GPU资源如下所示

当我们选中gpu镜像,想要添加gpu使会提示集群中不存在gpu,所以需要在某个节点插上物理gpu然后再集群中添加operator来使用gpu资源下面给出安装nvidia-gpu operator的方法:参考官方安装nvidia-operator链接

_什么是部署模型_部署方案如何写

下载准备helm3
curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 \

    && chmod 700 get_helm.sh \
    && ./get_helm.sh
确保NFD模式是关闭的,如果有开启的那么手动关闭它
kubectl get nodes -o json | jq '.items[].metadata.labels | keys | any(startswith("feature.node.kubernetes.io"))'
添加helm仓库
helm repo add nvidia https://helm.ngc.nvidia.com/nvidia \

    && helm repo update
部署gpu-operator
helm install --wait --generate-name \
    -n gpu-operator --create-namespace \
    nvidia/gpu-operator \
    --set driver.version=535

其中需要注意的是对应的驱动版本我用的是A800因此他是535.根据自己的nvidia-gpu型号确定自己的驱动版本。运行完成后会出现

并且节点出现可调度资源/gpu

打开kubelfow开始使用gpu训练任务。需要再这个页面指定含有cuda的镜像,并且在gpu配置中选择集群中可用的gpu nvidia。

创建好后进入jupyter内创建一个python3工具。在其中可以进行机器学习代码的开发。同时可以使用nvidia-smi命令在pod内部查看和适用到我们的GPU

下面给出使用gpu训练的代码

import os
import tensorflow as tf
import time
import numpy as np
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
print("====检查 GPU 可用性====")
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
_#_ _检查_ _GPU_ _是否可用_
if tf.test.is_gpu_available():
    print("\033[1;32m[GPU 可用] 将进行 GPU 和 CPU 训练对比\033[0m")
    gpu_device = tf.config.list_physical_devices('GPU')[0]
    print(f"可用的 GPU: {gpu_device}")
else:
    print("\033[1;31m[GPU 不可用] 只能使用 CPU 进行训练\033[0m")
    exit()
print("\n====加载和预处理数据====")
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255
def create_model():
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(10, activation='softmax')
    ])
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model
_# GPU_ _训练_
print("\n====开始 GPU 训练====")
with tf.device('/GPU:0'):
    gpu_model = create_model()
    start_time = time.time()
    gpu_history = gpu_model.fit(train_images, train_labels, epochs=10,
                                validation_split=0.2, batch_size=64, verbose=1)
    gpu_time = time.time() - start_time
_# CPU_ _训练_
print("\n====开始 CPU 训练====")
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'  _#_ _禁用_ _GPU_
with tf.device('/CPU:0'):
    cpu_model = create_model()
    start_time = time.time()
    cpu_history = cpu_model.fit(train_images, train_labels, epochs=10,
                                validation_split=0.2, batch_size=64, verbose=1)
    cpu_time = time.time() - start_time
_#_ _结果对比_
print("\n====训练时间对比====")
print(f"\033[1;34mGPU 训练时间: {gpu_time:.2f} 秒\033[0m")
print(f"\033[1;34mCPU 训练时间: {cpu_time:.2f} 秒\033[0m")
print(f"\033[1;32mGPU 加速比: {cpu_time / gpu_time:.2f}x\033[0m")
_#_ _绘制训练过程的损失和准确率曲线_
def plot_history(history, title):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    epochs = range(1, len(acc) + 1)
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, 'bo-', label='Training loss')
    plt.plot(epochs, val_loss, 'ro-', label='Validation loss')
    plt.title(f'{title} - Training and validation loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(epochs, acc, 'bo-', label='Training accuracy')
    plt.plot(epochs, val_acc, 'ro-', label='Validation accuracy')
    plt.title(f'{title} - Training and validation accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()
print("\n====可视化 GPU 训练过程====")
plot_history(gpu_history, "GPU Training")
print("\n====可视化 CPU 训练过程====")
plot_history(cpu_history, "CPU Training")
_#_ _评估_ _GPU_ _模型_
print("\n====评估 GPU 训练的模型====")
test_loss, test_acc = gpu_model.evaluate(test_images, test_labels, verbose=0)
print(f'\n\033[1;32mTest accuracy: {test_acc:.4f}\033[0m')
_#_ _保存_ _GPU_ _训练的模型_
print("\n====保存 GPU 训练的模型====")
gpu_model.save('mnist_model_gpu.keras')
print("模型已保存为 mnist_model_gpu.keras")

这段代码的主要功能是通过对比 GPU 和 CPU 在相同任务上的训练表现,来体现 GPU 的强大计算能力,尤其是在深度学习任务中的显著优势。代码首先检查当前环境中是否可用 GPU,并根据设备的可用性分别在 GPU 和 CPU 上训练一个简单的卷积神经网络(CNN),该网络用于对 MNIST 手写数字数据集进行分类。

同时在训练的过程中使用nvidia-smi命令可以查看到硬件GPU的使用情况

比如上图中可以看出在训练过程中GPU的利用率分别在9%左右,说明我们的任务成功的调用了GPU进行计算。

pipline

Kubeflow Pipelines 是 Kubeflow 项目中的一个核心模块,专注于构建、部署和管理复杂的机器学习工作流。它提供了一整套用于设计和自动化机器学习流水线的工具,使数据科学家和工程师能够更加高效地构建、管理和监控机器学习模型的训练和部署过程。通过 Kubeflow Pipelines,用户可以轻松地定义、分享、重用和自动化复杂的工作流。在pipline中可以上传制作好的.gz文件yaml文件等。创建好流程图后会有如下图形界面上显示。具体的制作过程可以参考官方文档…

import kfp
from kfp import dsl
from kfp.dsl import component, Input, Output, Dataset, Model
_# Step 1:_ _数据下载和预处理_
@component(
    base_image='python:3.8-slim',
    packages_to_install=[
        'pandas',
        'scikit-learn',
        'joblib',
        'numpy',
        'requests'
    ]
)
def preprocess_data_op(output_data: Output[Dataset]):
    print("开始执行 preprocess_data_op...")
    try:
        import pandas as pd
        print("成功导入 pandas 模块。")
    except ImportError as e:
        print(f"导入 pandas 失败: {e}")
        raise e
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    import os
    print("正在下载数据集...")
    url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"
    columns = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']
    data = pd.read_csv(url, names=columns)
    print("数据集下载完成。")
    _#_ _数据清洗和特征工程_
    print("正在进行数据清洗和特征工程...")
    X = data.drop('Outcome', axis=1)
    y = data['Outcome']
    _#_ _标准化特征_
    print("正在标准化特征...")
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    _#_ _划分训练集和测试集_
    print("正在划分训练集和测试集...")
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
    _#_ _保存预处理后的数据_
    print("正在保存预处理后的数据...")
    os.makedirs(output_data.path, exist_ok=True)
    pd.DataFrame(X_train).to_csv(os.path.join(output_data.path, 'X_train.csv'), index=False)
    pd.DataFrame(X_test).to_csv(os.path.join(output_data.path, 'X_test.csv'), index=False)
    pd.DataFrame(y_train).to_csv(os.path.join(output_data.path, 'y_train.csv'), index=False)
    pd.DataFrame(y_test).to_csv(os.path.join(output_data.path, 'y_test.csv'), index=False)
    print(f"数据预处理完成并已保存到 {output_data.path}。")
_# Step 2:_ _模型训练_
@component(
    base_image='python:3.8-slim',
    packages_to_install=[
        'pandas',
        'scikit-learn',
        'joblib',
        'numpy'
    ]
)
def train_model_op(input_data: Input[Dataset], output_model: Output[Model]):
    print("开始执行 train_model_op...")
    try:
        import pandas as pd
        print("成功导入 pandas 模块。")
    except ImportError as e:
        print(f"导入 pandas 失败: {e}")
        raise e
    from sklearn.linear_model import LogisticRegression
    import joblib
    import os
    print("正在加载训练数据...")
    X_train = pd.read_csv(os.path.join(input_data.path, 'X_train.csv'))
    y_train = pd.read_csv(os.path.join(input_data.path, 'y_train.csv'))
    _#_ _训练模型_
    print("正在训练模型...")
    model = LogisticRegression()
    model.fit(X_train, y_train.values.ravel())
    _#_ _创建输出目录并保存模型_
    os.makedirs(output_model.path, exist_ok=True)  _#_ _确保输出目录存在_
    model_path = os.path.join(output_model.path, 'trained_model.joblib')
    joblib.dump(model, model_path)
    print(f"模型训练完成并已保存到 {model_path}。")
_# Step 3:_ _模型评估_
@component(
    base_image='python:3.8-slim',
    packages_to_install=[
        'pandas',
        'scikit-learn',
        'joblib',
        'numpy'
    ]
)
def evaluate_model_op(input_data: Input[Dataset], input_model: Input[Model]):
    print("开始执行 evaluate_model_op...")
    try:
        import pandas as pd
        print("成功导入 pandas 模块。")
    except ImportError as e:
        print(f"导入 pandas 失败: {e}")
        raise e
    from sklearn.metrics import accuracy_score
    import joblib
    import os  # 添加os模块的导入
    print("正在加载测试数据和模型...")
    X_test = pd.read_csv(os.path.join(input_data.path, 'X_test.csv'))
    y_test = pd.read_csv(os.path.join(input_data.path, 'y_test.csv'))
    model = joblib.load(os.path.join(input_model.path, 'trained_model.joblib'))
    _#_ _预测和评估_
    print("正在进行模型预测和评估...")
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"模型准确率: {accuracy}")
# Step 4: Pipeline 定义
@dsl.pipeline(
    name='Diabetes Classifier Pipeline',
    description='A pipeline to train and evaluate a diabetes classifier model'
)
def diabetes_pipeline():
    preprocess = preprocess_data_op()
    train = train_model_op(input_data=preprocess.outputs['output_data'])
    evaluate = evaluate_model_op(input_data=preprocess.outputs['output_data'], input_model=train.outputs['output_model'])
# Compile the pipeline
if __name__ == "__main__":
    kfp.compiler.Compiler().compile(diabetes_pipeline, 'diabetes_pipeline.yaml')

上述是一个可以使用的pipline文件,在pipline中打开主要流程如下所示

运行之后可能还需要一个pvc文件

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kubeflow-test-pv
  namespace: kubeflow-user-example-com
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 128Mi

运行之后的结果

这个Python文件主要定义了一个基于Kubeflow Pipelines (KFP) 的数据处理与机器学习模型训练与评估的Pipeline,用于处理糖尿病分类任务。

对于需要多次执行,或者不同训练任务有共同的预处理项目。可以根据名字来使用之前处理好的缓存。提升训练效率

katib

AutoML 是机器学习比较热的领域,主要用来模型自动优化和超参数调整,这里其实是用的 Katib来实现的,一个基于k8s的 AutoML 项目,详细见/kubeflow/ka…Katib 主要提供了 超参数调整(Hyperparameter Tuning),早停法(Early Stopping)和神经网络架构搜索(Neural Architecture Search) Katib 支持多种超参数优化算法,包括随机搜索、贝叶斯优化和 HyperBand。这些算法可以帮助开发者在给定的参数空间中高效地搜索最佳超参数配置。通过定义实验,用户可以指定要调整的超参数及其范围、优化目标(如准确率或损失),以及使用的搜索算法。Katib 还支持提前停止策略,允许在训练过程中根据模型性能动态终止不佳的实验,从而节省计算资源。此外,Katib 不仅限于超参数调优,还提供了神经网络结构搜索(Neural Architecture Search, NAS)的功能。尽管这一功能仍在不断完善中,但它为用户提供了探索不同网络架构的可能性,进一步提升模型的表现。

新建文件notebooks如6.1节所示。创建好一个py文件后,在terminal中执行下面命令下载所需的库。

pip install kubeflow-katib

import kubeflow.katib as katib
_# Step 1. Create an objective function._
def objective(parameters):
    _# Import required packages._
    import time
    time.sleep(5)
    _# Calculate objective function._
    result = 4 * int(parameters["a"]) - float(parameters["b"]) ** 2
    _# Katib parses metrics in this format: =._
    print(f"result={result}")
_# Step 2. Create HyperParameter search space._
parameters = {
    "a": katib.search.int(min=10, max=20),
    "b": katib.search.double(min=0.1, max=0.2)
}
_# Step 3. Create Katib Experiment._
katib_client = katib.KatibClient()
name = "tune-experiment"
katib_client.tune(
    name=name,
    objective=objective,
    parameters=parameters,
    objective_metric_name="result",
    max_trial_count=12
)
_# Step 4. Get the best HyperParameters._
print(katib_client.get_optimal_hyperparameters(name))

运行后可以在autoML中查看参数优化文件和整体流程。

这段代码通过定义一个目标函数来自动进行超参数优化,使用随机搜索算法对该目标函数的形式是 result=4a - b^2,其中参数 a 是整数,范围为 10 到 20,参数 b 是浮点数,范围为 0.1 到 0.2。首先,代码定义了这个目标函数,然后创建了超参数的搜索空间。接着,使用Kubeflow Katib的 KatibClient 启动实验,并指定目标函数、参数范围和最大实验次数(12次)。最后,通过 katib_client.get_optimal_hyperparameters(name) 方法获取并输出最优的超参数组合,从而实现自动化的超参数调优,以优化指定的目标函数

a的取值范围为10-20,b的取值范围为0.1-0.2。在这个条件下使用随机搜索算法找到F的最大值。

下面给出一个更加复杂的计算式与使用其他算法计算的示例:

import kubeflow.katib as katib
_# Step 1. Create an objective function._
def objective(parameters):
    _# Import required packages._
    import time
    import math
    time.sleep(5)
    _# Calculate a more complex objective function._
    a = int(parameters["a"])
    b = float(parameters["b"])
    _#_ _复杂的目标函数示例:结合多项式、对数和三角函数的组合_
    result = (3 * a ** 2 + 2 * b - math.sin(a * b)) / (1 + math.log(b + 0.1)) + math.sqrt(abs(a - b))
    _# Katib parses metrics in this format: =._
    print(f"result={result}")
_# Step 2. Create HyperParameter search space._
parameters = {
    "a": katib.search.int(min=10, max=20),
    "b": katib.search.double(min=0.1, max=0.2)
}
_# Step 3. Create Katib Experiment._
katib_client = katib.KatibClient()
name = "tune-experiment-for-kalibbase"
katib_client.tune(
    name=name,
    objective=objective,
    parameters=parameters,
    algorithm_name="bayesianoptimization",
objective_metric_name="result",
#objective_type="minimize",  # 指定最小化目标函数
    max_trial_count=12
)
_# Step 4. Get the best HyperParameters._
print(katib_client.get_optimal_hyperparameters(name))

用更加复杂的函数去优化参数,同时使用了贝叶斯搜索算法

多用户使用kubeflow

Kubelfow支持使用profile的方式创建新用户。编辑一下文件创建用户

apiVersion: kubeflow.org/v1beta1
kind: Profile
metadata:
  name: test _#_ _用户__namespace_
spec:
  owner:
    kind: User
    name: huhu@test.com _#_ _用户名_

通过Kubectl get profile来查看创建状态

之后由于kubeflow是通过configmap来管理用户信息的。所以需要修改dex组件的configmapKubectl edit cm -n auth dex

    - email: huhu@test.com
      hash: $xxxxxordASFdg3Vmi
      username: xxx

其中email为刚刚创建的用户的账户,hash为登录的密码但是以hash的方式展示。

需要apt install python3-pip 安装pip。创建密码方式前提为pip install passlib bcrypt 之后执行一下的python代码

python3 -c 'from passlib.hash import bcrypt; import getpass; print(bcrypt.using(rounds=12, ident="2y").hash(getpass.getpass()))'

执行命令后输入想要加密的登录密码,如huhu,之后会生成一段hash值,将其填入上述的cm配置中。保存退出后,重启pod就可以用新用户登录kubeflow

将kubeflow与向量数据库milvus结合使用

对于大量模型训练来说,有一个好的向量数据库可以更加方便的处理数据的输入和输出,因此现在给出将kubeflow与milvus向量数据库结合起来使用。

部署milvus

详见我的另一篇技术文档”向量化数据库milvus”

在jupyter中使用k8s集群中的milvus

由于milvus是部署在k8s集群中的,因此在kubeflow中的jupyter中使用需要将milvus准备好。下面给出一个直接的示例

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
_#_ _连接到_ _Milvus_ _实例_
connections.connect(alias="default", host="192.168.0.208", port="31011")
print("已连接到 Milvus")
_#_ _创建集合_
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128)
]
schema = CollectionSchema(fields, description="用于演示的集合")
collection = Collection(name="demo_collection_2", schema=schema)  _#_ _使用新的集合名称_
print(f"集合 {collection.name} 已创建")
_#_ _插入向量数据_
vectors = np.random.random((20, 128)).astype(np.float32)  _#_ _插入__20__个向量_
collection.insert([list(range(20)), vectors])
print("已将数据插入集合")
_#_ _为向量字段创建索引_
index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",  _#_ _选择适合的索引类型_
    "params": {"nlist": 128}
}
collection.create_index(field_name="vector", index_params=index_params)
print("索引已创建")
_#_ _加载集合_
collection.load()
print("集合已加载到内存")
_#_ _查询向量数据_
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
query_vectors = np.random.random((5, 128)).astype(np.float32)  _#_ _查询__5__个向量_
search_results = collection.search(query_vectors, "vector", search_params, limit=3)
for i, result in enumerate(search_results):
    print(f"查询向量 {i+1} 的前三个结果: {result}")

在jupyter中新建一个python文件,将上述代码复制进去。处理好依赖相关的库运行。

milvus存储非结构化数据

由于milvus是向量数据库,因此下面给出一个可以将图片作为非结构数据存储到数据库中的方式

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
from PIL import Image
import matplotlib.pyplot as plt
import time
  
# 连接到 Milvus 实例
connections.connect(alias="default", host="192.168.0.208", port="31011")
print("已连接到 Milvus")
  
# 加载预训练的ResNet50模型并去掉最后的分类层,用于特征提取
base_model = ResNet50(weights='imagenet')
model = Model(inputs=base_model.input, outputs=base_model.get_layer('avg_pool').output)
  
# 动态生成集合名称,避免冲突
collection_name = f"image_collection_{int(time.time())}"
  
# 定义集合
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),  # 使用 auto_id 自动生成ID
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=2048),  # ResNet50的输出是2048维向量
    FieldSchema(name="img_path", dtype=DataType.VARCHAR, max_length=255)  # 存储图片路径
]
schema = CollectionSchema(fields, description="用于存储图片向量的集合")
collection = Collection(name=collection_name, schema=schema)
print(f"集合 {collection.name} 已创建")
  
# 图片列表
image_paths = ["3.jpg", "7.jpg", "9.jpg", "third.jpg"]
  
# 提取并存储图片特征向量和路径
all_features = []
all_paths = []
  
for img_path in image_paths:
    # 加载图片并进行预处理
    img = image.load_img(img_path, target_size=(224, 224))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
  
    # 提取图片的特征向量
    features = model.predict(x).flatten().tolist()  # 转换为列表以插入 Milvus
    # 收集数据
    all_features.append(features)
    all_paths.append(img_path)
  
# 将所有图片的特征向量和路径存储到 Milvus 中
entities = [
    all_features,  # 向量数据
    all_paths  # 路径数据
]
collection.insert(entities)
print(f"图片已存储")
  
# 为向量字段创建索引
index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index(field_name="vector", index_params=index_params)
print("索引已创建")
  
# 加载集合
collection.load()
print("集合已加载到内存")
  
# 查询与某张图片最相似的图片
query_img_path = "third.jpg"  # 要查询的图片
  
# 加载查询图片并提取特征向量
img = image.load_img(query_img_path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
query_features = model.predict(x).flatten().tolist()
  
# 查询与这张图片最相似的图片(只返回一个结果)
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
search_results = collection.search([query_features], "vector", search_params, limit=1, output_fields=["img_path"])
  
# 读取查询结果并展示图片
for hits in search_results:
    for hit in hits:
        result_img_path = hit.entity.get('img_path')
        if result_img_path:
            try:
                result_img = Image.open(result_img_path)
                plt.imshow(result_img)
                plt.title(f"检索到的图片: {result_img_path}")
                plt.axis('off')
                plt.show()
            except Exception as e:
                print(f"无法打开图片 {result_img_path}: {e}")
        else:
            print("未找到 'img_path' 字段")

运行后得到结果如下所示,我们通过上述的代码存储了4张图片进入milvus由于是非结构化向量存储,所以我们首先需要对图片进行向量化处理,代码中通过一个模型的最后一层输入,将一个图片分解为2048维的向量。通过将4张图片都分解后用一个2048维度的数组表示一张图片

最后通过查询的方式,将我们想要的照片从数据库中查询出来。通过这个例子很好的展示了如何使用向量化数据库milvus以及如何将一个图片进行向量化存储与查询的方式。

总结

一个简单的machine learning运行流程如上所示

整个流水线包括以下几部分:

基于上述功能描述我们其实可以基于 kubeflow 的 pipeline 和 kfserving 功能轻松实现一个简单的 MLOps 流水线发布流程。Kubeflow 是一个开源的机器学习平台,专为 Kubernetes 设计,旨在简化机器学习工作流的部署和管理。它将多种机器学习工具和框架整合到一个统一的生态系统中,提供了从数据准备到模型训练、优化和部署的全生命周期管理。

Kubeflow 的设计理念是提供一个全面、易用、可扩展的机器学习平台,利用 Kubernetes 的核心优势,如自动化部署、扩展和管理容器化应用程序。对于机器学习项目来说,Kubeflow 不仅提高了开发和部署的效率,还确保了解决方案的可移植性和可维护性。对于希望在 Kubernetes 上运行机器学习工作负载的团队而言,Kubeflow 提供了强大的工具和资源,使得机器学习的创新和实施更加便捷和高效。Kubeflow的工作全都可以在jupyter中完成,如可以在jupyter中创建pipline,创建kalib等。这些通过调用api的方式都可以直接创建生成对应的workflow。让机器学习工程师在使用的过程中方便的管理自己的模型并且可以便利的进行参数调优以及通过搭建构建pipline的方式让其完成一系列流水线的操作,如数据清洗,批量操作。