概述
公司的微服务都用了负载均衡,而配置信息的缓存用了本地缓存,这导致了如果在A微服务修改配置信息的时候,B微服务无法感知配置更新,导致B微服务会使用历史配置信息。
公司的做法
使用MySQL,建立主机缓存信息表
create table cache_config
(
id bigint not null comment '主键'
primary key,
app_name varchar(255) not null comment '应用名称',
host_id varchar(255) not null comment 'docker主机id',
host_ip varchar(255) not null comment '主机ip',
port int(6) not null comment '主机端口',
cache_name varchar(255) not null comment '缓存名称',
cache_status text null comment '缓存状态信息',
create_date datetime(3) not null comment '创建时间',
last_update datetime(3) not null comment '最后更新时间(5min上报一次缓存状态)'
)
collate = utf8mb4_unicode_ci;
每个微服务注册的时候,都会记录到MySQL
。当某个微服务修改配置时候,会查询MySQL
,获取所有微服务信息,然后通过HTTP Client
调用,通知其他微服务。这里使用了ServletRegistrationBean
注册Servlet
,而不是写个Controller接收。有兴趣的小伙伴可以查下ServletRegistrationBean
。
使用redis pub/sub
公司一开始的做法很简单,就是通过MySQL
保存微服务IP,端口和缓存信息
,通过HTTP Client
通知其他微服务。
但是很明显,做法绕了一大圈,性能也不高。不过更新配置信息不是频繁操作,所以现在使用也没大碍。然而使用redis pub/sub
,能显著提高性能,而且实现也没那么复杂,代码量大大降低,何乐而不为。
接下来,是我使用redis pub/sub
做配置信息变更推送的流程。
Redis缓存配置
/**
* redis缓存配置
*
* @author Terry
*/
@Configuration
public class RedisConfig {
/**
* 用于缓存通知
*
* @param lettuceConnectionFactory 缓存配置信息
* @return redisTemplate
*/
@Bean(name = "configNotifyTemplate")
public ReactiveRedisOperations<String, ConfigNotify> configNotifyTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisSerializationContext<String, ConfigNotify> serializationContext = RedisSerializationContext.<String, ConfigNotify>newSerializationContext(RedisSerializer.string())
.value(new Jackson2JsonRedisSerializer<>(ConfigNotify.class))
.build();
return new ReactiveRedisTemplate<>(lettuceConnectionFactory, serializationContext);
}
}
本地缓存配置
/**
* 本地缓存配置器
*
* @author Terry
*/
@Configuration
public class LocalCacheConfig {
/**
* 缓存加载最大重试次数
*/
private static final int CACHE_MAX_RETRY_TIMES = 5;
/**
* 接口编号-接口信息关联缓存
*/
private LoadingCache<Long, Optional<SaasInterfaceInfo>> interfaceInfos;
/**
* 接口Id-接口信息关联缓存
*
* @param interfaceRpc 获取接口信息
* @return 接口信息缓存
*/
@Bean
public LoadingCache<Long, Optional<SaasInterfaceInfo>> interfaceInfos(final InterfaceRpc interfaceRpc) {
interfaceInfos = Caffeine
.newBuilder()
.maximumSize(2000)
.expireAfterWrite(1, TimeUnit.HOURS)
.recordStats()
.build(new AbstractTryCacheLoader<Long, Optional<SaasInterfaceInfo>>(CACHE_MAX_RETRY_TIMES) {
@Override
public Optional<SaasInterfaceInfo> reLoad(Long key) throws Exception {
Optional<SaasInterfaceInfo> saasInterfaceInfo = interfaceRpc.getInterfaceInfo(key);
return saasInterfaceInfo;
}
});
return interfaceInfos;
}
}
这里对本地缓存加了@Bean,本地缓存注册Bean。方便后面通过Spring Bean获取本地缓存信息
配置通知类
/**
* 配置通知类
*
* @author Terry
*/
public class ConfigNotify {
public enum Type {
/**
* 更新(暂时停用,未开发)
*/
UPDATE,
/**
* 删除(暂时停用,未开发)
*/
DELETE,
/**
* 失效
*/
INVALID
}
public ConfigNotify() {
}
public ConfigNotify(Type type, String cacheName, String cacheKey) {
this.type = type;
this.cacheName = cacheName;
this.cacheKey = cacheKey;
}
private Type type;
private String cacheName;
private String cacheKey;
public Type getType() {
return type;
}
public void setType(Type type) {
this.type = type;
}
public String getCacheName() {
return cacheName;
}
public void setCacheName(String cacheName) {
this.cacheName = cacheName;
}
public String getCacheKey() {
return cacheKey;
}
public void setCacheKey(String cacheKey) {
this.cacheKey = cacheKey;
}
@Override
public String toString() {
return "ConfigNotify{" +
"type=" + type +
", cacheName='" + cacheName + '\'' +
", cacheKey='" + cacheKey + '\'' +
'}';
}
}
配置信息通知接口
/**
* 配置通知接口
*
* @author Terry
*/
public interface ConfigNotifyInterface {
/**
* 发布缓存失效通知
*
* @param cache 缓存对象
* @param cacheKey 缓存Key
*/
void publishChangeNotify(Cache cache, String cacheKey);
/**
* 发布清除缓存通知
*
* @param cache 缓存信息
*/
void publishClearNotify(Cache cache);
}
配置信息修改通知服务
/**
* 配置信息修改通知服务
*
* @author Terry
*/
@Component
public class ConfigNotifyService implements ConfigNotifyInterface {
private static final Logger logger = LoggerFactory.getLogger(ConfigNotifyService.class);
/**
* 绑定channel
*/
@Value("${topic.name:channel}")
private String topic;
/**
* 缓存
*/
private final ReactiveRedisOperations<String, ConfigNotify> configNotifyTemplate;
/**
* 应用缓存映射
*/
private final BiMap<String, Cache> LOCAL_CACHE_MAP;
private Disposable subscribe;
@Autowired
public ConfigNotifyService(@Qualifier(value = "configNotifyTemplate") ReactiveRedisOperations<String, ConfigNotify> configNotifyTemplate,
ApplicationContext context) {
this.configNotifyTemplate = configNotifyTemplate;
this.LOCAL_CACHE_MAP = HashBiMap.create();
this.LOCAL_CACHE_MAP.putAll(context.getBeansOfType(Cache.class));
}
/**
* 初始化监听器, 执行通知操作
*/
@PostConstruct
private void init() {
subscribe = this.configNotifyTemplate
.listenTo(ChannelTopic.of(topic))
.map(ReactiveSubscription.Message::getMessage)
.subscribe(this::receiveChangeNotify);
}
/**
* 从订阅移除
*/
@PreDestroy
public void shutdown() {
subscribe.dispose();
}
/**
* 接收缓存失效通知并处理
*
* @param configNotify 配置信息通知
*/
private void receiveChangeNotify(ConfigNotify configNotify) {
// 可以在这里打开日志查看配置通知信息
// logger.info(configNotify.toString());
ConfigNotify.Type type = configNotify.getType();
// 现在只用到使缓存失效,所以只有这判断
if (ConfigNotify.Type.INVALID.equals(type)) {
String cacheName = configNotify.getCacheName();
String cacheKey = configNotify.getCacheKey();
Cache cache = LOCAL_CACHE_MAP.get(cacheName);
if (cache != null) {
if (StringUtils.isNotBlank(cacheKey)) {
logger.debug("ConfigNotifyService invalidate CacheName: {}, CacheKey: {}", cacheName, cacheKey);
invalidate(cache, cacheKey);
} else {
logger.debug("ConfigNotifyService invalidate all CacheName: {}", cacheName);
cache.invalidateAll();
}
}
}
}
/**
* 发布缓存失效通知
*
* @param cache 缓存对象
* @param cacheKey 缓存Key
*/
@Override
public void publishChangeNotify(Cache cache, String cacheKey) {
if (cache == null) {
return;
}
String cacheName = LOCAL_CACHE_MAP.inverse().get(cache);
if (StringUtils.isEmpty(cacheName)) {
logger.error("LocalCacheMap获取应用缓存映射Key为空!!!");
return;
}
// redis订阅发布
cacheChangeNotice(cacheKey, cacheName);
}
/**
* 发布清除缓存通知
*
* @param cache 缓存信息
*/
@Override
public void publishClearNotify(Cache cache) {
publishChangeNotify(cache, "");
}
private void cacheChangeNotice(String cacheKey, String cacheName) {
ConfigNotify configNotify = new ConfigNotify();
configNotify.setType(ConfigNotify.Type.INVALID);
configNotify.setCacheName(cacheName);
configNotify.setCacheKey(cacheKey);
configNotifyTemplate.convertAndSend(topic, configNotify).subscribe();
}
/**
* 使缓存失效
*
* @param cache 缓存信息
* @param cacheKey 缓存Key
*/
private void invalidate(Cache cache, String cacheKey) {
if (StringUtils.isNotBlank(cacheKey)) {
cache.invalidate(cacheKey);
if (StringUtils.isNumeric(cacheKey)) {
long key = Long.parseLong(cacheKey);
cache.invalidate(key);
if (key <= Integer.MAX_VALUE) {
cache.invalidate(Integer.parseInt(cacheKey));
}
}
}
}
}
测试结果
通过三个的微服务分别调用一次缓存失效通知,都会通知已经订阅相同频道的微服务。如果自身也有接收方法,则也会调用到自身方法,本次调用是本地调用,通知其他订阅者是通过redis pub/sub。
总结
redis pub/sub
很好用,使用方便并且性能高,发布的消息会在socket buffers
中缓冲,并且会立即发送给订阅者,相比公司之前使用的方法利索很多很多。redis pub/sub
也是有缺点,如果某个微服务停止后,正好发送消息,这时候重启完成后接收不到消息。不过我们这里配置信息会每次启动都会全量查询一次MySQL,所以使用redis pub/sub
没问题。
redis的哨兵集群的组成也是基于redis pub/sub
。哨兵之间会通过相同频道获取其他哨兵的IP和端口进行网络连接。(哨兵出了彼此之间建立连接形成集群外,还会通过从库建立连接,主从库切换后会通知从库和新的主库进行同步。这里是通过向主库发送INFO
命令完成的,这里就不再细讲了)
评论区