• 作者:老汪软件技巧
  • 发表时间:2024-09-24 00:03
  • 浏览量:

1. 痛点针对

你是否和我一样,在维护一个中台的服务,比较痛的一个点是,上游服务在“同一条链路(一次用户请求)”中存在“同一个接口”的“多次”调用,并且上游还无法或者不好优化,其带来的后果就是我们下游服务的流量会不合理的翻倍,对于下游服务的压力(cpu、中间件等资源)的无辜消耗就会变多。

终于有解了,让我们一起探索下我们的解决方案。

2. 思路

先说结论:自定义@SessionCache注解,利用TransmittableThreadLocal可以跨线程传递变量的特性,将一次TraceId内的同一个接口的调用返回结果缓存下来,若再有调用直接返回,可明显降低服务CPU以及响应依赖资源的压力,降本增效。

3. 代码实践

tips:如果你不了解TransmittableThreadLocal,建议优先看上一篇文章TransmititableThreadLocal剖析。

3.1. SessionCache

/**
 * @Description 该注解标记的方法的入参不允许带对象,否则会不生效,除非自己实现hashcode
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SessionCache {
}

3.2. SessionCacheAdvisor

在 Spring AOP 中,StaticMethodMatcherPointcut 的 matches 方法、getPointcut 和 getAdvice 是在创建 Spring 应用程序上下文时,以及创建相应的 AOP 代理时被调用的。下面详细解释这些方法是如何以及何时被初始化和调用的:

应用程序上下文初始化: 在 Spring 应用程序启动期间,当应用程序上下文(ApplicationContext)被初始化,Spring 容器会扫描并注册所有的 Bean。如果 Bean 带有特定的注解(如 @Component、@Service、@Repository 等),Spring 容器会自动将其识别为 Bean,并进行初始化。AOP 相关组件注册: @Component 注解的 SessionCacheAdvisor 类也将在此过程中被注册。因为 SessionCacheAdvisor 是 AbstractPointcutAdvisor 的子类,其内部的 pointcut 和 interceptor 信息也需要被初始化。此时,getPointcut 和 getAdvice 方法会被调用,以获取切点和通知对象,它们是定义 AOP 逻辑的核心部分。代理创建: 当判断出某些方法需要被代理时,Spring AOP 使用 Pointcut 对象来判断是否要为目标 Bean 创建一个代理,并将 Advice 应用于那些匹配的方法上。这时:matches 调用: StaticMethodMatcherPointcut 的 matches 方法会在代理对象创建过程中被调用,以检查特定 Bean 的每个方法是否应当被切面逻辑所拦截。这通常发生在创建代理对象的过程中,根据方法签名进行切点匹配。如果 matches 返回 true,则表明给定方法需要被当前切面包含的通知所处理。代理使用: 一旦代理被创建,并且切点与通知被应用,之后对目标 Bean 的方法调用就会通过这个代理。代理内部会根据配置,在合适的时机调用通知的拦截逻辑(在这种情况下是 SessionCacheInterceptor)。

总的来说,getPointcut 和 getAdvice 是在 Spring 容器初始化 AOP 相关组件的时候调用,以获取切面所需的配置信息;matches 方法是在创建代理实例时调用,以决定哪些方法应该被代理拦截。这些步骤主要发生在应用程序的启动阶段。

@Component
public class SessionCacheAdvisor extends AbstractPointcutAdvisor {
    private final StaticMethodMatcherPointcut pointcut = new
            StaticMethodMatcherPointcut() {
                @Override
                public boolean matches(Method method, Class targetClass) {
                    return method.isAnnotationPresent(SessionCache.class);
                }
            };
    @Resource
    private SessionCacheInterceptor interceptor;
    @NotNull
    @Override
    public Pointcut getPointcut() {
        return this.pointcut;
    }
    @NotNull
    @Override
    public Advice getAdvice() {
        return interceptor;
    }
}

3.3. TtlUtil

public class TtlUtil {
    private static final TransmittableThreadLocal<Map<String, Object>> context = new TransmittableThreadLocal<Map<String, Object>>() {
        @Override
        protected Map<String, Object> initialValue() {
            //线程池以及父子线程之间的值是浅拷贝即引用关系,所以使用线程安全的hashmap
            return new ConcurrentHashMap<>(8);
        }
    };
    private static final TransmittableThreadLocal<String> startTraceId = new TransmittableThreadLocal<String>() {
        @Override
        protected String initialValue() {
            return "not start";
        }
    };
    public static Object get(String key, Object defaultVal) {
        return context.get().getOrDefault(key, defaultVal);
    }
    public static void start(String key) {
        try {
            if (!isStart(key)) {
                clean();
                startTraceId.set(key);
            }
        } catch (Exception e) {
            LogUtils.ERROR.error("SessionStartAspect invoke error sessionId:{}", key, e);
        }
    }
    public static boolean isStart(String key) {
        return StringUtils.isNotEmpty(key) && StringUtils.equals(startTraceId.get(), key);
    }
    public static void set(String key, Object value) {
        context.get().put(key, value);
    }
    public static void clean() {
        startTraceId.remove();
        context.remove();
    }
}

3.4. SessionCacheInterceptor

如何解决重复定义__重复调用函数

SessionCacheInterceptor 类是一个 Spring 组件,实现了 MethodInterceptor 接口。这个类设计用于拦截方法调用,并基于从方法调用细节生成的唯一键提供缓存功能。

@Component
@Order
public class SessionCacheInterceptor implements MethodInterceptor {
    private static final String CUT_SYMBOL = "_";
    private static class SessionValueNotExistFlag {
    }
    private final SessionValueNotExistFlag NOT_EXIST_FLAG = new SessionValueNotExistFlag();
    private static final Object NULL_OBJECT = new Object();
    @Override
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {
        String sessionId = MDC.get("traceId");
        if (!TtlUtil.isStart(sessionId)) {
            return methodInvocation.proceed();
        }
        String className = methodInvocation.getMethod().getDeclaringClass().getName();
        
        // 正常情况我们认定两个对象 类名和对象成员属性值->hashcode值 相同则为相同对象
        String instantiateAddress = Integer.toHexString(methodInvocation.getThis().hashCode());
        String methodName = methodInvocation.getMethod().getName();
        Object[] arguments = methodInvocation.getArguments();
        String argumentStr = joinArguments(arguments);
        // sessionId 实例对象地址 _ 类绝对路径名 _ 方法名 _ 序列化参数作为唯一key
        String cacheKey = sessionId.concat(CUT_SYMBOL)
                .concat(instantiateAddress).concat(CUT_SYMBOL)
                .concat(className).concat(CUT_SYMBOL)
                .concat(methodName).concat(CUT_SYMBOL)
                .concat(argumentStr);
        try {
            Object cacheValue = TtlUtil.get(cacheKey, NOT_EXIST_FLAG);
            if (cacheValue != NOT_EXIST_FLAG) {
                return cacheValue == NULL_OBJECT ? null : cacheValue;
            }
        } catch (Exception e) {
            LogUtils.ERROR.error("SessionCacheInterceptor invoke error key:{}", cacheKey, e);
        }
        Object proceed = methodInvocation.proceed();
        Object proceedCache = proceed == null ? NULL_OBJECT : proceed;
        TtlUtil.set(cacheKey, proceedCache);
        return proceed;
    }
}

3.5. SessionCacheEnable

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SessionCacheEnable {
}

3.6. SessionCacheEnableAspect

该类的主要方法是 sessionStart,它使用 @Around("@annotation(xxx.xxx.SessionCacheEnable)") 注解。这意味着该方法将在任何带有 SessionCacheEnable 注解的方法周围执行。

@Component
@Aspect
@Order(1)
public class SessionCacheEnableAspect {
    /**
     * 缓存开启
     */
    @Around("@annotation(xxx.xxx.SessionCacheEnable)")
    public Object sessionStart(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        Class returnType = method.getReturnType();
        try {
            //获取是否禁用sessionCache
            MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
            Method m = methodSignature.getMethod();
            SessionDisable sessionDisable = m.getAnnotation(SessionDisable.class);
            if (Objects.nonNull(sessionDisable)) {
                return joinPoint.proceed(args);
            }
            String sessionId = MDC.get("traceId");
            if (StringUtils.isEmpty(sessionId)){
                return joinPoint.proceed(args);
            }
            LogUtils.COMMON.debug("sessionStart,method:{},traceId:{}", joinPoint.getSignature().getName(), sessionId);
            TtlUtil.start(sessionId);
            return joinPoint.proceed(args);
        } catch (Exception e) {
            LogUtils.ERROR.error("error:", e);
            return wrapperRes(returnType,e);
        } finally {
            //清除内存中的副本数据
            TtlUtil.clean();
        }
    }
}

3.7. SessionDisable

/**
 * @Description: 如果不需要开启sessioncache添加该注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SessionDisable {
}

3.8. 示例:

QueryServiceImpl

@Service
public class QueryServiceImpl implements QueryService {
    @SessionCacheEnable
    public Response query(QueryRequest request) {
        ...
        methodA(id);
        ...
    }
}

methodA

@Autowired
private FacadeB facadeB;
@Override
public methodADTO methodA(Long id) {
    ...
    facadeB.methodB(id);
    ...
}

methodB

@SessionCache
public methodBDTO methodB(id) {
       ... 
}

4. 总结

该方法利用 TransmittableThreadLocal 的特性,实现了在一次会话中将多次请求合并的效果,对于该痛点问题十分奏效,大大降低不必要的资源浪费。该注解适合放在在上游Facade层或下游服务的Infrastructure层。