- 作者:老汪软件技巧
- 发表时间:2024-09-24 00:03
- 浏览量:
1. 痛点针对
你是否和我一样,在维护一个中台的服务,比较痛的一个点是,上游服务在“同一条链路(一次用户请求)”中存在“同一个接口”的“多次”调用,并且上游还无法或者不好优化,其带来的后果就是我们下游服务的流量会不合理的翻倍,对于下游服务的压力(cpu、中间件等资源)的无辜消耗就会变多。
终于有解了,让我们一起探索下我们的解决方案。
2. 思路
先说结论:自定义@SessionCache注解,利用TransmittableThreadLocal可以跨线程传递变量的特性,将一次TraceId内的同一个接口的调用返回结果缓存下来,若再有调用直接返回,可明显降低服务CPU以及响应依赖资源的压力,降本增效。
3. 代码实践
tips:如果你不了解TransmittableThreadLocal,建议优先看上一篇文章TransmititableThreadLocal剖析。
3.1. SessionCache
/**
* @Description 该注解标记的方法的入参不允许带对象,否则会不生效,除非自己实现hashcode
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SessionCache {
}
3.2. SessionCacheAdvisor
在 Spring AOP 中,StaticMethodMatcherPointcut 的 matches 方法、getPointcut 和 getAdvice 是在创建 Spring 应用程序上下文时,以及创建相应的 AOP 代理时被调用的。下面详细解释这些方法是如何以及何时被初始化和调用的:
应用程序上下文初始化: 在 Spring 应用程序启动期间,当应用程序上下文(ApplicationContext)被初始化,Spring 容器会扫描并注册所有的 Bean。如果 Bean 带有特定的注解(如 @Component、@Service、@Repository 等),Spring 容器会自动将其识别为 Bean,并进行初始化。AOP 相关组件注册: @Component 注解的 SessionCacheAdvisor 类也将在此过程中被注册。因为 SessionCacheAdvisor 是 AbstractPointcutAdvisor 的子类,其内部的 pointcut 和 interceptor 信息也需要被初始化。此时,getPointcut 和 getAdvice 方法会被调用,以获取切点和通知对象,它们是定义 AOP 逻辑的核心部分。代理创建: 当判断出某些方法需要被代理时,Spring AOP 使用 Pointcut 对象来判断是否要为目标 Bean 创建一个代理,并将 Advice 应用于那些匹配的方法上。这时:matches 调用: StaticMethodMatcherPointcut 的 matches 方法会在代理对象创建过程中被调用,以检查特定 Bean 的每个方法是否应当被切面逻辑所拦截。这通常发生在创建代理对象的过程中,根据方法签名进行切点匹配。如果 matches 返回 true,则表明给定方法需要被当前切面包含的通知所处理。代理使用: 一旦代理被创建,并且切点与通知被应用,之后对目标 Bean 的方法调用就会通过这个代理。代理内部会根据配置,在合适的时机调用通知的拦截逻辑(在这种情况下是 SessionCacheInterceptor)。
总的来说,getPointcut 和 getAdvice 是在 Spring 容器初始化 AOP 相关组件的时候调用,以获取切面所需的配置信息;matches 方法是在创建代理实例时调用,以决定哪些方法应该被代理拦截。这些步骤主要发生在应用程序的启动阶段。
@Component
public class SessionCacheAdvisor extends AbstractPointcutAdvisor {
private final StaticMethodMatcherPointcut pointcut = new
StaticMethodMatcherPointcut() {
@Override
public boolean matches(Method method, Class> targetClass) {
return method.isAnnotationPresent(SessionCache.class);
}
};
@Resource
private SessionCacheInterceptor interceptor;
@NotNull
@Override
public Pointcut getPointcut() {
return this.pointcut;
}
@NotNull
@Override
public Advice getAdvice() {
return interceptor;
}
}
3.3. TtlUtil
public class TtlUtil {
private static final TransmittableThreadLocal<Map<String, Object>> context = new TransmittableThreadLocal<Map<String, Object>>() {
@Override
protected Map<String, Object> initialValue() {
//线程池以及父子线程之间的值是浅拷贝即引用关系,所以使用线程安全的hashmap
return new ConcurrentHashMap<>(8);
}
};
private static final TransmittableThreadLocal<String> startTraceId = new TransmittableThreadLocal<String>() {
@Override
protected String initialValue() {
return "not start";
}
};
public static Object get(String key, Object defaultVal) {
return context.get().getOrDefault(key, defaultVal);
}
public static void start(String key) {
try {
if (!isStart(key)) {
clean();
startTraceId.set(key);
}
} catch (Exception e) {
LogUtils.ERROR.error("SessionStartAspect invoke error sessionId:{}", key, e);
}
}
public static boolean isStart(String key) {
return StringUtils.isNotEmpty(key) && StringUtils.equals(startTraceId.get(), key);
}
public static void set(String key, Object value) {
context.get().put(key, value);
}
public static void clean() {
startTraceId.remove();
context.remove();
}
}
3.4. SessionCacheInterceptor
SessionCacheInterceptor 类是一个 Spring 组件,实现了 MethodInterceptor 接口。这个类设计用于拦截方法调用,并基于从方法调用细节生成的唯一键提供缓存功能。
@Component
@Order
public class SessionCacheInterceptor implements MethodInterceptor {
private static final String CUT_SYMBOL = "_";
private static class SessionValueNotExistFlag {
}
private final SessionValueNotExistFlag NOT_EXIST_FLAG = new SessionValueNotExistFlag();
private static final Object NULL_OBJECT = new Object();
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
String sessionId = MDC.get("traceId");
if (!TtlUtil.isStart(sessionId)) {
return methodInvocation.proceed();
}
String className = methodInvocation.getMethod().getDeclaringClass().getName();
// 正常情况我们认定两个对象 类名和对象成员属性值->hashcode值 相同则为相同对象
String instantiateAddress = Integer.toHexString(methodInvocation.getThis().hashCode());
String methodName = methodInvocation.getMethod().getName();
Object[] arguments = methodInvocation.getArguments();
String argumentStr = joinArguments(arguments);
// sessionId 实例对象地址 _ 类绝对路径名 _ 方法名 _ 序列化参数作为唯一key
String cacheKey = sessionId.concat(CUT_SYMBOL)
.concat(instantiateAddress).concat(CUT_SYMBOL)
.concat(className).concat(CUT_SYMBOL)
.concat(methodName).concat(CUT_SYMBOL)
.concat(argumentStr);
try {
Object cacheValue = TtlUtil.get(cacheKey, NOT_EXIST_FLAG);
if (cacheValue != NOT_EXIST_FLAG) {
return cacheValue == NULL_OBJECT ? null : cacheValue;
}
} catch (Exception e) {
LogUtils.ERROR.error("SessionCacheInterceptor invoke error key:{}", cacheKey, e);
}
Object proceed = methodInvocation.proceed();
Object proceedCache = proceed == null ? NULL_OBJECT : proceed;
TtlUtil.set(cacheKey, proceedCache);
return proceed;
}
}
3.5. SessionCacheEnable
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SessionCacheEnable {
}
3.6. SessionCacheEnableAspect
该类的主要方法是 sessionStart,它使用 @Around("@annotation(xxx.xxx.SessionCacheEnable)") 注解。这意味着该方法将在任何带有 SessionCacheEnable 注解的方法周围执行。
@Component
@Aspect
@Order(1)
public class SessionCacheEnableAspect {
/**
* 缓存开启
*/
@Around("@annotation(xxx.xxx.SessionCacheEnable)")
public Object sessionStart(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
Class> returnType = method.getReturnType();
try {
//获取是否禁用sessionCache
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method m = methodSignature.getMethod();
SessionDisable sessionDisable = m.getAnnotation(SessionDisable.class);
if (Objects.nonNull(sessionDisable)) {
return joinPoint.proceed(args);
}
String sessionId = MDC.get("traceId");
if (StringUtils.isEmpty(sessionId)){
return joinPoint.proceed(args);
}
LogUtils.COMMON.debug("sessionStart,method:{},traceId:{}", joinPoint.getSignature().getName(), sessionId);
TtlUtil.start(sessionId);
return joinPoint.proceed(args);
} catch (Exception e) {
LogUtils.ERROR.error("error:", e);
return wrapperRes(returnType,e);
} finally {
//清除内存中的副本数据
TtlUtil.clean();
}
}
}
3.7. SessionDisable
/**
* @Description: 如果不需要开启sessioncache添加该注解
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SessionDisable {
}
3.8. 示例:
QueryServiceImpl
@Service
public class QueryServiceImpl implements QueryService {
@SessionCacheEnable
public Response query(QueryRequest request) {
...
methodA(id);
...
}
}
methodA
@Autowired
private FacadeB facadeB;
@Override
public methodADTO methodA(Long id) {
...
facadeB.methodB(id);
...
}
methodB
@SessionCache
public methodBDTO methodB(id) {
...
}
4. 总结
该方法利用 TransmittableThreadLocal 的特性,实现了在一次会话中将多次请求合并的效果,对于该痛点问题十分奏效,大大降低不必要的资源浪费。该注解适合放在在上游Facade层或下游服务的Infrastructure层。