Sentinel规则持久化Push模式两种实现方式
文章目录
- sentinel持久化push推模式
- 微服务端的实现
- 具体实现
- 源码分析
- 读数据源
- 写数据源的实现
- 微服务端解析读数据源流程
- 修改源码的实现
- 官方demo
- 修改源码实现
- 配置类
- flow
- authority
- degread
- param
- system
- gateway
- 修改源码
- 测试
- 补充
前置知识 pull模式
sentinel持久化push推模式
pull拉模式的缺点,以保存本地文件举例:
- 定时任务是每隔3s执行一次,去判断规则持久化文件的最后修改时间。这里有一定时间的延迟,但如果时间设置的太短,有影响服务器的性能
- 我们的微服务是集群部署的,其他服务实例可读取不到我这台服务器的本地文件
所以还有一种push推送模式。我们一般会引入第三方中间件来实现,以Nacos为例。我们修改了nacos中的配置,它就会将更新后的数据推送给微服务。
push模式有两种实现方式:
-
在微服务端添加读数据源,为dataId添加监听器,当规则配置文件更改之后我就获取到更改后的规则内存并更新内存中的数据;再添加一个写数据源,每当dashboard中更新了规则,我除了更新内存中的数据之外,我通过ConfigService.publishConfig()方法还往Nacos端进行写入
-
在dashboard源码中进行更改,在获取规则内容、更新规则内容的接口中,不要和微服务端进行交互,直接去和Nacos通信,通过ConfigService.publishConfig()和ConfigService.getConfig()来实现。这种方式主要注意dashboard端的规则实体对象和微服务端的规则实体对象不一致问题,需要经过转换相关的操作。sentinel默认情况下就直接把规则实体转换为json字符串推送给Nacos,Nacos配置文件更改了,又推送给微服务,微服务这边再把json字符串转换为规则实体对象这一步就会发现,转换失败了,某些属性对应不上。进而就导致了dashboard端设置的规则在微服务这边未生效。
微服务端的实现
具体实现
引入读数据源的依赖
com.alibaba.csp sentinel-datasource-nacos配置文件中添加规则持久化的dataId
server: port: 8806 spring: application: name: mall-user-sentinel-rule-push #微服务名称 #配置nacos注册中心地址 cloud: nacos: discovery: server-addr: 127.0.0.1:8848 sentinel: transport: # 添加sentinel的控制台地址 dashboard: 127.0.0.1:8080 datasource: # 名称自定义,可以随便定义字符串 flow-rules: nacos: server-addr: 127.0.0.1:8848 # dataId取了微服务名字,后面再拼接字符串 dataId: ${spring.application.name}-flow-rules # 我这里在Nacos配置中心,单独使用了一个组 groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: flow degrade-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: degrade param-flow-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-param-flow-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: param-flow authority-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-authority-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: authority system-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-system-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: system在Nacos配置中心中创建对应的配置文件
编写java类,定义写数据源
import com.alibaba.cloud.sentinel.SentinelProperties; import com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 添加往Nacos的写数据源,只不过未使用InitFunc * 如果要使用就需要放开注解 */ @Configuration(proxyBeanMethods = false) @AutoConfigureAfter(SentinelAutoConfiguration.class) public class SentinelNacosDataSourceConfiguration { @Bean @ConditionalOnMissingBean public SentinelNacosDataSourceHandler sentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) { return new SentinelNacosDataSourceHandler(sentinelProperties); } }import com.alibaba.cloud.sentinel.SentinelProperties; import com.alibaba.cloud.sentinel.datasource.RuleType; import com.alibaba.cloud.sentinel.datasource.config.DataSourcePropertiesConfiguration; import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties; import com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler; import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; import com.alibaba.csp.sentinel.slots.system.SystemRule; import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.SmartInitializingSingleton; import java.util.List; /** * sentinel 规则持久化到 nacos配置中心 */ public class SentinelNacosDataSourceHandler implements SmartInitializingSingleton { private final SentinelProperties sentinelProperties; public SentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) { this.sentinelProperties = sentinelProperties; } @Override public void afterSingletonsInstantiated() { // 遍历我们配置文件中指定的多个spring.cloud.sentinel.datasource的多个配置 sentinelProperties.getDatasource().values().forEach(this::registryWriter); } private void registryWriter(DataSourcePropertiesConfiguration dataSourceProperties) { // 只获取application.yml文件中 nacos配置的数据源 final NacosDataSourceProperties nacosDataSourceProperties = dataSourceProperties.getNacos(); if (nacosDataSourceProperties == null) { return; } // 获取规则类型,然后根据各个类型创建相应的写数据源 final RuleType ruleType = nacosDataSourceProperties.getRuleType(); switch (ruleType) { case FLOW: WritableDataSource flowRuleWriter = new NacosWritableDataSource(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerFlowDataSource(flowRuleWriter); break; case DEGRADE: WritableDataSource degradeRuleWriter = new NacosWritableDataSource(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWriter); break; case PARAM_FLOW: WritableDataSource paramFlowRuleWriter = new NacosWritableDataSource(nacosDataSourceProperties, JSON::toJSONString); ModifyParamFlowRulesCommandHandler.setWritableDataSource(paramFlowRuleWriter); break; case SYSTEM: WritableDataSource systemRuleWriter = new NacosWritableDataSource(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerSystemDataSource(systemRuleWriter); break; case AUTHORITY: WritableDataSource authRuleWriter = new NacosWritableDataSource(nacosDataSourceProperties, JSON::toJSONString); WritableDataSourceRegistry.registerAuthorityDataSource(authRuleWriter); break; default: break; } } }import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties; import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; import java.util.Properties; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 将sentinel规则写入到nacos配置中心 * @param */ @Slf4j public class NacosWritableDataSource implements WritableDataSource { private final Converter configEncoder; private final NacosDataSourceProperties nacosDataSourceProperties; private final Lock lock = new ReentrantLock(true); private ConfigService configService = null; public NacosWritableDataSource(NacosDataSourceProperties nacosDataSourceProperties, Converter configEncoder) { if (configEncoder == null) { throw new IllegalArgumentException("Config encoder cannot be null"); } if (nacosDataSourceProperties == null) { throw new IllegalArgumentException("Config nacosDataSourceProperties cannot be null"); } this.configEncoder = configEncoder; this.nacosDataSourceProperties = nacosDataSourceProperties; final Properties properties = buildProperties(nacosDataSourceProperties); try { // 也可以直接注入NacosDataSource,然后反射获取其configService属性 this.configService = NacosFactory.createConfigService(properties); } catch (NacosException e) { log.error("create configService failed.", e); } } private Properties buildProperties(NacosDataSourceProperties nacosDataSourceProperties) { Properties properties = new Properties(); if (!StringUtils.isEmpty(nacosDataSourceProperties.getServerAddr())) { properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDataSourceProperties.getServerAddr()); } else { properties.setProperty(PropertyKeyConst.ACCESS_KEY, nacosDataSourceProperties.getAccessKey()); properties.setProperty(PropertyKeyConst.SECRET_KEY, nacosDataSourceProperties.getSecretKey()); properties.setProperty(PropertyKeyConst.ENDPOINT, nacosDataSourceProperties.getEndpoint()); } if (!StringUtils.isEmpty(nacosDataSourceProperties.getNamespace())) { properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDataSourceProperties.getNamespace()); } if (!StringUtils.isEmpty(nacosDataSourceProperties.getUsername())) { properties.setProperty(PropertyKeyConst.USERNAME, nacosDataSourceProperties.getUsername()); } if (!StringUtils.isEmpty(nacosDataSourceProperties.getPassword())) { properties.setProperty(PropertyKeyConst.PASSWORD, nacosDataSourceProperties.getPassword()); } return properties; } @Override public void write(T value) throws Exception { lock.lock(); // todo handle cluster concurrent problem try { String convertResult = configEncoder.convert(value); if (configService == null) { log.error("configServer is null, can not continue."); return; } // 规则配置数据推送到nacos配置中心 final boolean published = configService.publishConfig(nacosDataSourceProperties.getDataId(), nacosDataSourceProperties.getGroupId(), convertResult); if (!published) { log.error("sentinel {} publish to nacos failed.", nacosDataSourceProperties.getRuleType()); } } finally { lock.unlock(); } } @Override public void close() throws Exception { } }启动微服务进行测试。
dashboard中为某个接口定义一个流控规则
调用接口测试,发送三次请求
查看Nacos中的配置文件,就会发现也成功写入了
源码分析
读数据源
引入读数据源的依赖,我们来看看具体是怎么实现的
com.alibaba.csp sentinel-datasource-nacos实现思路:
- 和文件的读数据源一样,继承了AbstractDataSource类,这样就不需要我们再去写一遍加载配置、更新内存中的配置
在源码中的这个扩展包下面,就有nacos读数据源的实现
我们先看看NacosDataSource类的父类的代码
- 创建一个DynamicSentinelProperty对象,主要作用是更新内存中的规则配置
- 加载配置、解析配置
public abstract class AbstractDataSource implements ReadableDataSource { protected final Converter parser; protected final SentinelProperty property; public AbstractDataSource(Converter parser) { if (parser == null) { throw new IllegalArgumentException("parser can't be null"); } // 子类传过来的解析器 this.parser = parser; // 更新内存中的配置 // 我们会经常看见 getProperty().updateValue(newValue); 这样的代码 this.property = new DynamicSentinelProperty(); } @Override public T loadConfig() throws Exception { // 调用子类的readSource()方法,一般会得到一个String, // 在通过解析器Converter 并解析配置转换成对应的对象 return loadConfig(readSource()); } public T loadConfig(S conf) throws Exception { // 解析配置 T value = parser.convert(conf); return value; } @Override public SentinelProperty getProperty() { return property; } }读配置源的具体实现:
- 通过Nacos的serverAddr构建一个Properties对象,该对象会用于初始化ConfigService接口的对象
- 利用线程池中唯一一个线程,创建一个监听器,监听dataId,当配置中心的配置更改后就会调用微服务客户端,微服务客户端这边有一个while+阻塞队列实现的轮询机制,它调用监听器的方法,监听器里面会更新内存中的规则配置
- 初始化configService对象,并通过configService.addListener(…)为指定的dataId添加监听器
- 微服务刚启动会调用父类的loadConfig()方法,父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析;再更新内存中的规则配置
public class NacosDataSource extends AbstractDataSource { private static final int DEFAULT_TIMEOUT = 3000; // 创建一个只有一个线程的线程池,用来执行dataId的监听器 private final ExecutorService pool = new ThreadPoolExecutor(...); private final Listener configListener; private final String groupId; private final String dataId; private final Properties properties; private ConfigService configService = null; public NacosDataSource(final String serverAddr, final String groupId, final String dataId,Converter parser) { this(NacosDataSource.buildProperties(serverAddr), groupId, dataId, parser); } public NacosDataSource(final Properties properties, final String groupId, final String dataId,Converter parser) { super(parser); if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { throw new IllegalArgumentException(...); } AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst"); this.groupId = groupId; this.dataId = dataId; this.properties = properties; // 创建一个监听器 this.configListener = new Listener() { @Override public Executor getExecutor() { return pool; } @Override public void receiveConfigInfo(final String configInfo) { RecordLog.info(...); // 通过转换器进行转换 T newValue = NacosDataSource.this.parser.convert(configInfo); // 调用父类的SentinelProperty对象,更新内存中的规则配置 getProperty().updateValue(newValue); } }; // 初始化configService对象,并通过configService.addListener(..)为指定的dataId添加监听器 initNacosListener(); // 微服务刚启动,会从Nacos配置中心加载一次配置 loadInitialConfig(); } private void loadInitialConfig() { try { // 调用父类的loadConfig() 父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析 T newValue = loadConfig(); if (newValue == null) { RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source"); } // 调用父类的SentinelProperty对象,更新内存中的规则配置 getProperty().updateValue(newValue); } catch (Exception ex) { RecordLog.warn("[NacosDataSource] Error when loading initial config", ex); } } private void initNacosListener() { try { // 初始化configService对象 this.configService = NacosFactory.createConfigService(this.properties); // Add config listener. // 通过configService.addListener(..)为指定的dataId添加监听器 configService.addListener(dataId, groupId, configListener); } catch (Exception e) { RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e); e.printStackTrace(); } } @Override public String readSource() throws Exception { if (configService == null) { throw new IllegalStateException("Nacos config service has not been initialized or error occurred"); } // 通过ConfigService接口中的getConfig()方法,从Nacos配置中心获取配置 return configService.getConfig(dataId, groupId, DEFAULT_TIMEOUT); } @Override public void close() { if (configService != null) { configService.removeListener(dataId, groupId, configListener); } pool.shutdownNow(); } private static Properties buildProperties(String serverAddr) { // 构建一个Properties对象,该对象会在初始化ConfigService时会用上 Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr); return properties; } }写数据源的实现
写数据源源码实现流程相对简单。我们知道dashboard更新配置后调用微服务端,微服务这边的ModifyRulesCommandHandler类会处理规则更改的请求。这里会有一个写数据源相关的操作
// 注意name = "setRules",这就是控制台请求服务端的url路径 @CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}") public class ModifyRulesCommandHandler implements CommandHandler { public CommandResponse handle(CommandRequest request) { //...... // 处理流控规则 if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) { List flowRules = JSONArray.parseArray(data, FlowRule.class); FlowRuleManager.loadRules(flowRules); // 关键一步,这里会有一个写数据源的操作。默认情况下是没有WritableDataSource,我们可以在这里进行扩展 if (!writeToDataSource(getFlowDataSource(), flowRules)) { result = WRITE_DS_FAILURE_MSG; } return CommandResponse.ofSuccess(result); // 处理权限规则 } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) { ... // 处理熔断规则 } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) { ... // 处理系统规则 } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) { ... } return CommandResponse.ofFailure(new IllegalArgumentException("invalid type")); } }所以我们要做的事情就是创建一个写数据源,并进行注册写数据源WritableDataSourceRegistry。我们先来看看源码中的Demo,通过读写文件的方式实现的读写数据源。
public void init() throws Exception { // 文件保存路径 String flowRuleDir = System.getProperty("user.home") + File.separator + "sentinel" + File.separator + "rules"; String flowRuleFile = "flowRule.json"; String flowRulePath = flowRuleDir + File.separator + flowRuleFile; // 添加读数据源 ReadableDataSource ds = new FileRefreshableDataSource( flowRulePath, source -> JSON.parseObject(source, new TypeReference() {}) ); FlowRuleManager.register2Property(ds.getProperty()); // 添加写数据源 WritableDataSource wds = new FileWritableDataSource(flowRulePath, this::encodeJson); WritableDataSourceRegistry.registerFlowDataSource(wds); }我在定义一个往Nacos的写数据源,一个简单的实现,具体项目中能用的请参考上面 一章 。这里只是用更少的代码来理解nacos的写数据源
import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.csp.sentinel.init.InitFunc; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry; import com.alibaba.fastjson.JSON; import java.util.List; public class NacosDataSourceInitFunc implements InitFunc { @Override public void init() throws Exception { //流控规则 WritableDataSource writableDataSource = new NacosWritableDataSource( "127.0.0.1:8848", "DEFAULT_GROUP", "mall-user-sentinel-rule-push-demo-flow", JSON::toJSONString); WritableDataSourceRegistry.registerFlowDataSource(writableDataSource); } }import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.WritableDataSource; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.ConfigType; import com.alibaba.nacos.api.exception.NacosException; import java.util.Properties; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class NacosWritableDataSource implements WritableDataSource { private final String serverAddr; private final String groupId; private final String dataId; private final Properties properties; private ConfigService configService; private final Converter configEncoder; private final Lock lock = new ReentrantLock(true); public NacosWritableDataSource(String serverAddr, String groupId, String dataId, Converter configEncoder) { this.serverAddr = serverAddr; this.groupId = groupId; this.dataId = dataId; // 通过serverAddr构建一个properties对象 this.properties = NacosWritableDataSource.buildProperties(serverAddr); this.configEncoder = configEncoder; initConfigService(); } private void initConfigService() { try { // 通过properties对象初始化ConfigService this.configService = NacosFactory.createConfigService(properties); } catch (NacosException e) { e.printStackTrace(); } } private static Properties buildProperties(String serverAddr) { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr); return properties; } @Override public void write(T t) throws Exception { lock.lock(); try { // 通过ConfigService往Nacos配置中心写入数据 configService.publishConfig(dataId, groupId, this.configEncoder.convert(t), ConfigType.JSON.getType()); } finally { lock.unlock(); } } @Override public void close() throws Exception { } }微服务端解析读数据源流程
我们引入了下面的依赖
com.alibaba.csp sentinel-datasource-nacos并在配置文件中指定了多个读数据源。这些数据源是如何创建的嘞?
server: port: 8806 spring: application: name: mall-user-sentinel-rule-push #微服务名称 #配置nacos注册中心地址 cloud: nacos: discovery: server-addr: 127.0.0.1:8848 sentinel: transport: # 添加sentinel的控制台地址 dashboard: 127.0.0.1:8080 datasource: # 名称自定义,可以随便定义字符串 # 每一个都是一个读数据源 flow-rules: nacos: server-addr: 127.0.0.1:8848 # dataId取了微服务名字,后面再拼接字符串 dataId: ${spring.application.name}-flow-rules # 我这里在Nacos配置中心,单独使用了一个组 groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: flow # 读数据源 degrade-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: degrade param-flow-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-param-flow-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: param-flow authority-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-authority-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: authority system-rules: nacos: server-addr: 127.0.0.1:8848 dataId: ${spring.application.name}-system-rules groupId: SENTINEL_GROUP username: nacos password: nacos data-type: json rule-type: system源码的入口是SentinelDataSourceHandler类,它实现了SmartInitializingSingleton接口,这是Spring中的接口,所有非懒加载单例bean创建完成之后会调用这个接口的实现类:
- 在构造函数中依赖注入SentinelProperties对象,该对象中保存了我们配置文件中所有读数据源的配置
- 遍历SentinelProperties对象中的读数据源,并为每一个读数据源生成一个beanName
- 为每一个读数据源对象 + beanName 创建一个BeanDefinition
- 将BeanDefinition添加进BeanFactory中
- BeanFactory.getBean(beanName) 创建读数据源对象。该对象其实是FactoryBean类型的
- 上方的getBean()方法最终会调用至NacosDataSourceFactoryBean.getObject()方法,在这里创建NacosDataSource对象。该对象就是上方引入maven依赖中的读数据源对象。
public class SentinelDataSourceHandler implements SmartInitializingSingleton { //...... // SentinelProperties中保存着Map datasource // 也就是我们上方yml文件中定义的多个数据源,我们自定义的名字就是String private final SentinelProperties sentinelProperties; // 构造方法中进行依赖注入 sentinelProperties对象 public SentinelDataSourceHandler(DefaultListableBeanFactory beanFactory,SentinelProperties sentinelProperties,...) { //... this.sentinelProperties = sentinelProperties; } // 遍历Map集合,最终取出我们的每一个配置的数据源 @Override public void afterSingletonsInstantiated() { sentinelProperties.getDatasource().forEach((dataSourceName, dataSourceProperties) -> { try { List validFields = dataSourceProperties.getValidField(); // ... // AbstractDataSourceProperties就是我们在配置文件中具体的每一个配置对象的公共父类 AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties .getValidDataSourceProperties(); abstractDataSourceProperties.setEnv(env); abstractDataSourceProperties.preCheck(dataSourceName); // 把我们配置的每一个数据源,还有这里字符串凭借的一个beanName。调用下面的registerBean()方法 // beanName为 flow-rules + "-sentinel-" + nacos + "-datasource" // flow-rules是我们在yml文件中自定义的名字,nacos就是下面的validFields.get(0)值 registerBean(abstractDataSourceProperties, dataSourceName+ "-sentinel-" + validFields.get(0) + "-datasource"); } catch (Exception e) { log.error(...); } }); } private void registerBean(final AbstractDataSourceProperties dataSourceProperties,String dataSourceName) { // 对我们的数据源生成一个BeanDefinition BeanDefinitionBuilder builder = parseBeanDefinition(dataSourceProperties,dataSourceName); // 将BeanDefinition添加进BeanFactory中 this.beanFactory.registerBeanDefinition(dataSourceName,builder.getBeanDefinition()); // 通过beanFactory.getBean(dataSourceName)方法,创建bean对象 // 我们配置文件中定义的每一个读数据源就变为了一个一个的bean // 注意,我们的读数据源它是一个FactoryBean,这里的getBean()方法最终会去到NacosDataSourceFactoryBean.getObject() AbstractDataSource newDataSource = (AbstractDataSource) this.beanFactory.getBean(dataSourceName); // 将读数据源添加进对应的规则管理器中 dataSourceProperties.postRegister(newDataSource); }public class NacosDataSourceFactoryBean implements FactoryBean { //...... @Override public NacosDataSource getObject() throws Exception { // 为properties对象赋值 Properties properties = new Properties(); if (!StringUtils.isEmpty(this.serverAddr)) { properties.setProperty(PropertyKeyConst.SERVER_ADDR, this.serverAddr); } else { properties.setProperty(PropertyKeyConst.ENDPOINT, this.endpoint); } if (!StringUtils.isEmpty(this.contextPath)) { properties.setProperty(PropertyKeyConst.CONTEXT_PATH, this.contextPath); } if (!StringUtils.isEmpty(this.accessKey)) { properties.setProperty(PropertyKeyConst.ACCESS_KEY, this.accessKey); } if (!StringUtils.isEmpty(this.secretKey)) { properties.setProperty(PropertyKeyConst.SECRET_KEY, this.secretKey); } if (!StringUtils.isEmpty(this.namespace)) { properties.setProperty(PropertyKeyConst.NAMESPACE, this.namespace); } if (!StringUtils.isEmpty(this.username)) { properties.setProperty(PropertyKeyConst.USERNAME, this.username); } if (!StringUtils.isEmpty(this.password)) { properties.setProperty(PropertyKeyConst.PASSWORD, this.password); } // 创建一个Nacos读数据源对象,这里也就是上方: —— 的那一个对象 return new NacosDataSource(properties, groupId, dataId, converter); } // ...... }修改源码的实现
我们需要在Sentinel源码中进行修改,将dashboard和微服务之间的通信,改为dashboard和nacos的通信。在通过Nacos配置中心的推送机制去更新微服务内存中的规则配置。
从 Sentinel 1.4.0 开始,Sentinel 控制台提供 DynamicRulePublisher 和 DynamicRuleProvider 接口用于实现应用维度的规则推送和拉取:
-
DynamicRuleProvider: 拉取规则
-
DynamicRulePublisher: 推送规则
在dashboard工程下的com.alibaba.csp.sentinel.dashboard.rule包下创建nacos包,然后把各种场景的配置规则拉取和推送的实现类写到此包下
可以参考Sentinel Dashboard test包下的流控规则拉取和推送的实现逻辑
官方demo
我们看看官方的demo是如何实现的
首先创建一个NacosConfigUtil类,用来定义常量
public final class NacosConfigUtil { // 其实demo中也就用到了上面两个常量 // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置 public static final String GROUP_ID = "SENTINEL_GROUP"; // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样 // 避免你写一个dataId,微服务从另一个dataId去读 public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules"; public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-rules"; public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map"; public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config"; public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config"; public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config"; public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set"; private NacosConfigUtil() {} }创建一个NacosConfig配置类,这里就定义了流控规则相关的转换器
@Configuration public class NacosConfig { // 流控规则相关 定义 List 到 String的转换器 @Bean public Converter flowRuleEntityEncoder() { return JSON::toJSONString; } // 流控规则相关 定义 String 到 List的转换器 @Bean public Converter flowRuleEntityDecoder() { return s -> JSON.parseArray(s, FlowRuleEntity.class); } // 根据一个Nacos的serverAddr,创建ConfigService对象。推送配置/拉取配置都是通过该对象来完成的 @Bean public ConfigService nacosConfigService() throws Exception { return ConfigFactory.createConfigService("localhost"); } }接下来我们来看看dashboard推送规则配置的实现代码,它实现了DynamicRulePublisher接口
@Component("flowRuleNacosPublisher") public class FlowRuleNacosPublisher implements DynamicRulePublisher { // 注入上面配置类的中定义的ConfigService和Converter转换器 @Autowired private ConfigService configService; @Autowired private Converter converter; @Override public void publish(String app, List rules) throws Exception { AssertUtil.notEmpty(app, "app name cannot be empty"); if (rules == null) { return; } // 调用Nacos的configService.publishConfig(..)方法 推送配置 // dataId为 appName + 最上方的常量文件后缀-flow-rules , 分组为最上方定义的常量SENTINEL_GROUP , 并对规则配置集合转换为json字符串 configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, converter.convert(rules)); } }接下来我们来看看dashboard拉取规则配置的实现代码,它实现了DynamicRuleProvider接口
@Component("flowRuleNacosProvider") public class FlowRuleNacosProvider implements DynamicRuleProvider { // 注入上面配置类的中定义的ConfigService和Converter转换器 @Autowired private ConfigService configService; @Autowired private Converter converter; @Override public List getRules(String appName) throws Exception { // 调用Nacos的configService.getConfig(dataId, group, timeoutMs)方法 拉取配置 String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID, 3000); if (StringUtil.isEmpty(rules)) { return new ArrayList(); } // 将json字符串转换为 List 规则实体对象集合 return converter.convert(rules); } }官方Demo这种方式功能上的确是实现了与Nacos通信,对Nacos配置中心进行读写。但存在一个小问题。那就是dashboard这边规则实体对象是FlowRuleEntity,但是微服务端规则实体对象是FlowRule。Nacos把配置推送给微服务端时,微服务端把json字符串转换为实体对象时可能就会出现不匹配的情况 —> 微服务规则实体对象没有相应的值 ----> 内存中的规则也就不完善 ----> 出现了dashboard端更新的规则微服务端未生效情况。
当然,流控规则都还好,如下图所示,这两个之间的实体对象成员属性基本上都能对应上
但热点规则这边的实体就不行了,他们之间的层级关系就不同了
public class ParamFlowRuleEntity extends AbstractRuleEntity { public ParamFlowRuleEntity() { } // ParamFlowRule为客户端的规则实体,但是这里将一整个实体对象变为了ParamFlowRuleEntity的其中一个属性 // 所以这里转json之后的层级关系就发生了改变 public ParamFlowRuleEntity(ParamFlowRule rule) { AssertUtil.notNull(rule, "Authority rule should not be null"); // 父类中的属性 this.rule = rule; } ... } // 父类 public abstract class AbstractRuleEntity implements RuleEntity { protected Long id; protected String app; protected String ip; protected Integer port; // ParamFlowRule为客户端的规则实体,成为了ParamFlowRuleEntity实体的一个成员属性 protected T rule; private Date gmtCreate; private Date gmtModified; ... }为了解决这种情况,那么就需要定义一个规范,存入Nacos配置中心的数据只能是微服务那边的规则实体对象,不能是dashboard这边的规则实体对象
修改源码实现
naocs配置中心保存的是微服务端的规则实体对象
各个规则都先在dashboard端将规则实体转换为微服务能用的规则实体在推送至Nacos配置中心
从Nacos配置中心获取配置后,都先将json字符串转换为dashboard端的规则实体对象
项目结构如下
配置类
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.ApiDefinitionEntity; import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.GatewayFlowRuleEntity; import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.RuleEntity; import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.fastjson.JSON; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; public final class NacosConfigUtil { // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置 public static final String GROUP_ID = "SENTINEL_GROUP"; // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样 // 避免你写一个dataId,微服务从另一个dataId去读 public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules"; public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-flow-rules"; public static final String DEGRADE_DATA_ID_POSTFIX = "-degrade-rules"; public static final String SYSTEM_DATA_ID_POSTFIX = "-system-rules"; public static final String AUTHORITY_DATA_ID_POSTFIX = "-authority-rules"; public static final String GATEWAY_FLOW_DATA_ID_POSTFIX = "-gateway-flow-rules"; public static final String GATEWAY_API_DATA_ID_POSTFIX = "-gateway-api-rules"; public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map"; /** * cc for `cluster-client` */ public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config"; /** * cs for `cluster-server` */ public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config"; public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config"; public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set"; //超时时间 public static final int READ_TIMEOUT = 3000; private NacosConfigUtil() {} /** * RuleEntity----->Rule * 控制台这边的规则实体都是RuleEntity类型的,这里就调用各个规则实体对象中的toRule()方法,转换为微服务端的规则实体对象 * 例如 FlowRuleEntity#toRule ----> FlowRule ParamFlowRuleEntity#toRule ----> ParamFlowRule * @param entities * @return */ public static String convertToRule(List
-
- 和文件的读数据源一样,继承了AbstractDataSource类,这样就不需要我们再去写一遍加载配置、更新内存中的配置
-








