侧边栏壁纸
博主头像
Terry

『LESSON 5』

  • 累计撰写 90 篇文章
  • 累计创建 21 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

配置修改后使用redis pub/sub 通知其他微服务

Terry
2020-12-12 / 0 评论 / 0 点赞 / 848 阅读 / 8,744 字 / 正在检测是否收录...

概述

公司的微服务都用了负载均衡,而配置信息的缓存用了本地缓存,这导致了如果在A微服务修改配置信息的时候,B微服务无法感知配置更新,导致B微服务会使用历史配置信息。

公司的做法

使用MySQL,建立主机缓存信息表

create table cache_config
(
    id           bigint       not null comment '主键'
        primary key,
    app_name     varchar(255) not null comment '应用名称',
    host_id      varchar(255) not null comment 'docker主机id',
    host_ip      varchar(255) not null comment '主机ip',
    port         int(6)       not null comment '主机端口',
    cache_name   varchar(255) not null comment '缓存名称',
    cache_status text         null comment '缓存状态信息',
    create_date  datetime(3)  not null comment '创建时间',
    last_update  datetime(3)  not null comment '最后更新时间(5min上报一次缓存状态)'
)
    collate = utf8mb4_unicode_ci;

每个微服务注册的时候,都会记录到MySQL。当某个微服务修改配置时候,会查询MySQL,获取所有微服务信息,然后通过HTTP Client调用,通知其他微服务。这里使用了ServletRegistrationBean注册Servlet,而不是写个Controller接收。有兴趣的小伙伴可以查下ServletRegistrationBean

使用redis pub/sub

公司一开始的做法很简单,就是通过MySQL保存微服务IP,端口和缓存信息,通过HTTP Client通知其他微服务。
但是很明显,做法绕了一大圈,性能也不高。不过更新配置信息不是频繁操作,所以现在使用也没大碍。然而使用redis pub/sub,能显著提高性能,而且实现也没那么复杂,代码量大大降低,何乐而不为。
接下来,是我使用redis pub/sub做配置信息变更推送的流程。

Redis缓存配置

/**
 * redis缓存配置
 *
 * @author Terry
 */
@Configuration
public class RedisConfig {

    /**
     * 用于缓存通知
     *
     * @param lettuceConnectionFactory 缓存配置信息
     * @return redisTemplate
     */
    @Bean(name = "configNotifyTemplate")
    public ReactiveRedisOperations<String, ConfigNotify> configNotifyTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisSerializationContext<String, ConfigNotify> serializationContext = RedisSerializationContext.<String, ConfigNotify>newSerializationContext(RedisSerializer.string())
                .value(new Jackson2JsonRedisSerializer<>(ConfigNotify.class))
                .build();
        return new ReactiveRedisTemplate<>(lettuceConnectionFactory, serializationContext);
    }
}

本地缓存配置

/**
 * 本地缓存配置器
 *
 * @author Terry
 */
@Configuration
public class LocalCacheConfig {

    /**
     * 缓存加载最大重试次数
     */
    private static final int CACHE_MAX_RETRY_TIMES = 5;

    /**
     * 接口编号-接口信息关联缓存
     */
    private LoadingCache<Long, Optional<SaasInterfaceInfo>> interfaceInfos;

    /**
     * 接口Id-接口信息关联缓存
     *
     * @param interfaceRpc 获取接口信息
     * @return 接口信息缓存
     */
    @Bean
    public LoadingCache<Long, Optional<SaasInterfaceInfo>> interfaceInfos(final InterfaceRpc interfaceRpc) {
        interfaceInfos = Caffeine
                .newBuilder()
                .maximumSize(2000)
                .expireAfterWrite(1, TimeUnit.HOURS)
                .recordStats()
                .build(new AbstractTryCacheLoader<Long, Optional<SaasInterfaceInfo>>(CACHE_MAX_RETRY_TIMES) {
                    @Override
                    public Optional<SaasInterfaceInfo> reLoad(Long key) throws Exception {
                        Optional<SaasInterfaceInfo> saasInterfaceInfo = interfaceRpc.getInterfaceInfo(key);
                        return saasInterfaceInfo;
                    }
                });
        return interfaceInfos;
    }
}

这里对本地缓存加了@Bean,本地缓存注册Bean。方便后面通过Spring Bean获取本地缓存信息

配置通知类

/**
 * 配置通知类
 *
 * @author Terry
 */
public class ConfigNotify {

    public enum Type {

        /**
         * 更新(暂时停用,未开发)
         */
        UPDATE,

        /**
         * 删除(暂时停用,未开发)
         */
        DELETE,

        /**
         * 失效
         */
        INVALID
    }

    public ConfigNotify() {
    }

    public ConfigNotify(Type type, String cacheName, String cacheKey) {
        this.type = type;
        this.cacheName = cacheName;
        this.cacheKey = cacheKey;
    }

    private Type type;

    private String cacheName;

    private String cacheKey;

    public Type getType() {
        return type;
    }

    public void setType(Type type) {
        this.type = type;
    }

    public String getCacheName() {
        return cacheName;
    }

    public void setCacheName(String cacheName) {
        this.cacheName = cacheName;
    }

    public String getCacheKey() {
        return cacheKey;
    }

    public void setCacheKey(String cacheKey) {
        this.cacheKey = cacheKey;
    }

    @Override
    public String toString() {
        return "ConfigNotify{" +
                "type=" + type +
                ", cacheName='" + cacheName + '\'' +
                ", cacheKey='" + cacheKey + '\'' +
                '}';
    }
}

配置信息通知接口

/**
 * 配置通知接口
 *
 * @author Terry
 */
public interface ConfigNotifyInterface {

    /**
     * 发布缓存失效通知
     *
     * @param cache    缓存对象
     * @param cacheKey 缓存Key
     */
    void publishChangeNotify(Cache cache, String cacheKey);

    /**
     * 发布清除缓存通知
     *
     * @param cache 缓存信息
     */
    void publishClearNotify(Cache cache);
}

配置信息修改通知服务

/**
 * 配置信息修改通知服务
 *
 * @author Terry
 */
@Component
public class ConfigNotifyService implements ConfigNotifyInterface {

    private static final Logger logger = LoggerFactory.getLogger(ConfigNotifyService.class);

    /**
     * 绑定channel
     */
    @Value("${topic.name:channel}")
    private String topic;

    /**
     * 缓存
     */
    private final ReactiveRedisOperations<String, ConfigNotify> configNotifyTemplate;

    /**
     * 应用缓存映射
     */
    private final BiMap<String, Cache> LOCAL_CACHE_MAP;

    private Disposable subscribe;

    @Autowired
    public ConfigNotifyService(@Qualifier(value = "configNotifyTemplate") ReactiveRedisOperations<String, ConfigNotify> configNotifyTemplate,
                               ApplicationContext context) {
        this.configNotifyTemplate = configNotifyTemplate;
        this.LOCAL_CACHE_MAP = HashBiMap.create();
        this.LOCAL_CACHE_MAP.putAll(context.getBeansOfType(Cache.class));
    }

    /**
     * 初始化监听器, 执行通知操作
     */
    @PostConstruct
    private void init() {
        subscribe = this.configNotifyTemplate
                .listenTo(ChannelTopic.of(topic))
                .map(ReactiveSubscription.Message::getMessage)
                .subscribe(this::receiveChangeNotify);
    }

    /**
     * 从订阅移除
     */
    @PreDestroy
    public void shutdown() {
        subscribe.dispose();
    }

    /**
     * 接收缓存失效通知并处理
     *
     * @param configNotify 配置信息通知
     */
    private void receiveChangeNotify(ConfigNotify configNotify) {
	// 可以在这里打开日志查看配置通知信息
	// logger.info(configNotify.toString());
        ConfigNotify.Type type = configNotify.getType();
 	// 现在只用到使缓存失效,所以只有这判断
        if (ConfigNotify.Type.INVALID.equals(type)) {
            String cacheName = configNotify.getCacheName();
            String cacheKey = configNotify.getCacheKey();
            Cache cache = LOCAL_CACHE_MAP.get(cacheName);
            if (cache != null) {
                if (StringUtils.isNotBlank(cacheKey)) {
                    logger.debug("ConfigNotifyService invalidate CacheName: {}, CacheKey: {}", cacheName, cacheKey);
                    invalidate(cache, cacheKey);
                } else {
                    logger.debug("ConfigNotifyService invalidate all CacheName: {}", cacheName);
                    cache.invalidateAll();
                }
            }
        }
    }

    /**
     * 发布缓存失效通知
     *
     * @param cache    缓存对象
     * @param cacheKey 缓存Key
     */
    @Override
    public void publishChangeNotify(Cache cache, String cacheKey) {
        if (cache == null) {
            return;
        }
        String cacheName = LOCAL_CACHE_MAP.inverse().get(cache);
        if (StringUtils.isEmpty(cacheName)) {
            logger.error("LocalCacheMap获取应用缓存映射Key为空!!!");
            return;
        }
        // redis订阅发布
        cacheChangeNotice(cacheKey, cacheName);
    }

    /**
     * 发布清除缓存通知
     *
     * @param cache 缓存信息
     */
    @Override
    public void publishClearNotify(Cache cache) {
        publishChangeNotify(cache, "");
    }

    private void cacheChangeNotice(String cacheKey, String cacheName) {
        ConfigNotify configNotify = new ConfigNotify();
        configNotify.setType(ConfigNotify.Type.INVALID);
        configNotify.setCacheName(cacheName);
        configNotify.setCacheKey(cacheKey);
        configNotifyTemplate.convertAndSend(topic, configNotify).subscribe();
    }

    /**
     * 使缓存失效
     *
     * @param cache    缓存信息
     * @param cacheKey 缓存Key
     */
    private void invalidate(Cache cache, String cacheKey) {
        if (StringUtils.isNotBlank(cacheKey)) {
            cache.invalidate(cacheKey);
            if (StringUtils.isNumeric(cacheKey)) {
                long key = Long.parseLong(cacheKey);
                cache.invalidate(key);
                if (key <= Integer.MAX_VALUE) {
                    cache.invalidate(Integer.parseInt(cacheKey));
                }
            }
        }
    }
}

测试结果

saas-app-one

saas-app-two

saas-app-three
通过三个的微服务分别调用一次缓存失效通知,都会通知已经订阅相同频道的微服务。如果自身也有接收方法,则也会调用到自身方法,本次调用是本地调用,通知其他订阅者是通过redis pub/sub。

总结

redis pub/sub很好用,使用方便并且性能高,发布的消息会在socket buffers中缓冲,并且会立即发送给订阅者,相比公司之前使用的方法利索很多很多。redis pub/sub也是有缺点,如果某个微服务停止后,正好发送消息,这时候重启完成后接收不到消息。不过我们这里配置信息会每次启动都会全量查询一次MySQL,所以使用redis pub/sub没问题。
redis的哨兵集群的组成也是基于redis pub/sub。哨兵之间会通过相同频道获取其他哨兵的IP和端口进行网络连接。(哨兵出了彼此之间建立连接形成集群外,还会通过从库建立连接,主从库切换后会通知从库和新的主库进行同步。这里是通过向主库发送INFO命令完成的,这里就不再细讲了)

0

评论区