• 作者:老汪软件技巧
  • 发表时间:2024-09-24 17:02
  • 浏览量:

引言

在函数式编程艺术:构建高效的FunctionalUtils工具类 文中基于函数式接口构建了一个简单的工具类。目前项目刚好出现不同模块之间需要数据交互,且会出现失败,需要重试等功能。因为不同模块之间不同方法都需要重试,故基于函数式接口抽取公共代码构建重试机制工具类。

正文主流框架重试机制简介

在Java中实现RPC调用的重试机制在分布式系统中,远程过程调用(RPC)是服务间进行通信的一种常见方式。由于网络波动、服务不稳定等因素,RPC调用有时会失败。为了提高系统的健壮性,通常需要实现RPC调用的重试机制。

Dubbo的重试机制设置

在Dubbo中,可以通过配置服务提供者(Provider)或服务消费者(Consumer)的retries属性来设置重试次数。默认情况下,Dubbo对失败的同步调用进行重试,但不对异步调用和无返回值的方法进行重试。

以下是如何在Dubbo中设置重试机制的示例:

在Dubbo中,retries参数的值表示除了原始调用外还要进行的重试次数。例如,如果retries设置为2,则总共会进行3次调用(1次原始调用+2次重试)。

Motan的重试机制设置

在Motan中,同样可以通过配置文件设置重试次数。Motan支持在服务端和客户端配置重试策略,通常在客户端进行配置。

以下是如何在Motan中设置重试机制的示例:

在Motan中,retries参数同样表示除了原始调用外的重试次数。

以上简单介绍各大中间件重试配置。但是其只适合不同服务之间重试,但是服务内部重试却无法使用上述中间件重试机制,故此这边需要自我实现一套重试。

编程函数怎么用__编程函数公式大全

自我实现重试工具类

不同功能模块之间调用可能会出现失败需要重试,故此这边基于

将介绍如何在Java中使用RpcRetryUtils类来实现RPC调用的同步和异步重试

package com.dereksmart.crawling.fuc;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/***
 * 提供了同步和异步的RPC调用重试机制
 */
public class RpcRetryUtils {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    // 定义重试策略
    public static class RetryPolicy {
        private final int maxAttempts;
        private final long delayBetweenRetries;
        public RetryPolicy(int maxAttempts, long delayBetweenRetries) {
            this.maxAttempts = maxAttempts;
            this.delayBetweenRetries = delayBetweenRetries;
        }
        public int getMaxAttempts() {
            return maxAttempts;
        }
        public long getDelayBetweenRetries() {
            return delayBetweenRetries;
        }
    }
    /***
     *
     * 通用的重试方法
     * 泛型同步方法,用于执行一个 `Supplier` 类型的 `rpcCall`。如果 `rpcCall` 在执行过程中抛出异常,该方法将根据 `retryPolicy` 重试调用
     * @param rpcCall
     * @param retryPolicy
     * @param exceptionHandler
     * @param 
     * @return
     */
    public static  T callWithRetry(Supplier rpcCall, RetryPolicy retryPolicy, Consumer exceptionHandler) {
        int attempts = 0;
        while (true) {
            try {
                return rpcCall.get(); // 尝试RPC调用
            } catch (Exception e) {
                attempts++;
                if (attempts >= retryPolicy.getMaxAttempts()) {
                    exceptionHandler.accept(e); // 处理异常
                    throw e; // 如果超过最大尝试次数,抛出异常
                }
                try {
                    Thread.sleep(retryPolicy.getDelayBetweenRetries()); // 等待一段时间后重试
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // 保留中断状态
                    throw new RuntimeException("Retry interrupted", ie);
                }
            }
        }
    }
    /***
     * 泛型异步方法,返回一个 `CompletableFuture`。使用 `attemptCallAsync` 方法来尝试执行 `rpcCall`,并在失败时进行重试。
     * 如果成功,它将调用 `onSuccess` 函数,并将结果传递给 `CompletableFuture`。
     * 如果重试次数用尽,它将调用 `onFailure` 消费者处理异常,并将异常传递给 `CompletableFuture`
     * @param rpcCall
     * @param retryPolicy
     * @param onFailure
     * @param onSuccess
     * @param 
     * @return
     */
    public static  CompletableFuture callWithRetryAsync(
            Supplier rpcCall,
            RetryPolicy retryPolicy,
            Consumer onFailure,
            Function onSuccess) {
        CompletableFuture future = new CompletableFuture<>();
        attemptCallAsync(rpcCall, retryPolicy, 0, future, onFailure, onSuccess);
        return future;
    }
    private static  void attemptCallAsync(
            Supplier rpcCall,
            RetryPolicy retryPolicy,
            int attempt,
            CompletableFuture future,
            Consumer onFailure,
            Function onSuccess) {
        executorService.submit(() -> {
            try {
                T result = rpcCall.get(); // 尝试RPC调用
                future.complete(onSuccess.apply(result)); // RPC调用成功,应用成功处理函数
            } catch (Throwable t) {
                if (attempt < retryPolicy.getMaxAttempts()) { // 检查是否还有剩余尝试次数
                    try {
                        Thread.sleep(retryPolicy.getDelayBetweenRetries()); // 延迟一段时间后重试
                    } catch (InterruptedException ie) {
                        future.completeExceptionally(ie); // 中断时异常处理
                        return;
                    }
                    attemptCallAsync(rpcCall, retryPolicy, attempt + 1, future, onFailure, onSuccess); // 递归调用,增加尝试次数
                } else {
                    onFailure.accept(t); // 调用失败处理函数
                    future.completeExceptionally(t); // 完成CompletableFuture并传递异常
                }
            }
        });
    }
    public static void shutdown() {
        executorService.shutdown();
    }
}

模拟类:

package com.dereksmart.crawling.fuc;
public class RpcService {
    public String fetchData(String a){
        return a+" RPC"+" DEREK_SMART";
    }
    public String fetchDataAyns(String a){
        System.out.println("进来了。。。。。。。。。。。");
         int c =  1/0;
        return a+" RPC"+" DEREK_SMART";
    }
}

测试类:

package com.dereksmart.crawling.fuc;
import java.util.concurrent.CompletableFuture;
public class RpcRetryTest {
    public static void main(String[] args) {
        RpcService rpcService = new RpcService();
        RpcRetryUtils.RetryPolicy retryPolicy = new RpcRetryUtils.RetryPolicy(3, 1000);
        //同步测
        try {
            String result = RpcRetryUtils.callWithRetry(
                    () -> rpcService.fetchData("param"),
                    retryPolicy,
                    e -> System.out.println("RPC同步调用失败: " + e.getMessage())
            );
            System.out.println("RPC调用结果: " + result);
        } catch (Exception e) {
            System.out.println("RPC调用重试后仍然失败.");
        }
        //异步测试
        CompletableFuture future = RpcRetryUtils.callWithRetryAsync(
                () -> rpcService.fetchDataAyns("param"),
                retryPolicy,
                throwable -> System.out.println("RPC调用失败: " + throwable.getMessage()),
                result -> {
                    System.out.println("RPC调用成功,处理结果");
                    return result; // 可以在这里进行结果处理
                }
        ).thenAccept(result -> System.out.println("RPC调用结果: " + result))
                .exceptionally(throwable -> {
                    System.out.println("RPC调用重试后仍然失败: " + throwable.getMessage());
                    return null;
                });
        future.join(); // 阻塞直到future完成// 在适当的时机关闭ExecutorService,比如在所有future都完成后
        RpcRetryUtils.shutdown();
    }
}

结论

通过RpcRetryUtils类,我们可以方便地为Java应用程序中的RPC调用添加重试机制。这不仅可以提高系统的可靠性,还可以通过异步重试提高系统的响应性和性能。在使用线程池时,合理管理资源是非常重要的。

当然上面工具类还有很多不足之处目前只有固定间隔重试重试策略,缺少不同的重试策略,例如:指数退避和带有抖动的退避策略。此外,也少了重试机制中的不同异常处理和错误记录,以及如何优雅地处理重试过程中可能出现的不同类型的异常。