Posted by lily's blog on May 9, 2025

抽奖项目简介

参考文章:

  1. xfg博客园
  2. 抽奖系统设计

一个抽奖,想让用户合理的别中奖(品牌方不能亏损),又要能达到拉新促活的效果、提高平台曝光量的需求(keep之前就有一个跑步抽奖的活动很出圈),并且抽奖细节从其设计到落地实现都是需要精细化控制成本的。

现在各个互联网产品场景中,都有抽奖模块来拉新促活,如;平台签到积分抽奖、商城支付完成抽奖、打车优惠发券抽奖都需依赖抽奖模块

"你以为你玩的是抽奖,但其实玩的是你对人性的理解!这也是营销中最复杂的产品运营设定,既要让用户玩的开心,又有让平台有的赚。所以纵横交错逻辑功能实现的营销组也是各个互联网公司中代码实现度最复杂的组。"
					----xgf

展示给你的是抽奖,没展示给你的全是手段!

  • 首次,免费抽奖,可设定抽奖范围。让你第一次抽奖超过62.9%,甚至达到99%全是随机积分,而这个随机的积分有时候恰好够你消耗所剩积分完成一次抽奖。
  • 而后,随机积分,也是远小于所需抽奖的积分,目的就是为了消耗你的积分值。100积分,最后得到20积分。类似斗地主每把都有入场费。
  • 再有,增加的大奖抽奖,必须用户抽奖n次后解锁。都抽奖1次了,再来2次就解锁了。这个时候你就忘记了自己要赞积分,指向点下10连抽。
  • 最后,再有一个6000积分消耗必中奖策略,让用户知道反正最后会得到一次非积分的奖品,赶紧梭哈!

这些,还只是你表面能看见了的,看不见的还有一些配置的手段。比如你是个老羊毛用户,那么你的抽奖根本就是个摆设,你能得到的奖品都是风控兜底,比如100积分,抽到1积分。

这样的复杂系统非常适合使用 DDD 进行领域建模设计,通过领域的拆解分析得到所需开发的各项功能领域。通过这样的手段,也能更好的管理后续需求的迭代

DDD分层基础

本项目是基于DDD架构搭建的微服务分布式项目, DDD分层是项目基础架构设计的一种模式,类似于web开发中的MVC架构,我觉得将DDD分层架构与曾经学习过的spring clould Alibaba微服务架构类比会更相似些。

![[Pasted image 20241120092421.png]]

为什么用DDD

DDD结构它是一种充血模型结构,所有的服务实现都以领域为核心, 应用层定义接口,领域层(domian)实现接口, 领域层(domian)定义数据仓储,基础层实现数据仓储中关于DAO和Redis的操作, 但同时几方又有互相的依赖。

如果按照模块化拆分,那么会需要做一些处理,包括:

  1. 应用层,不再给领域层定义接口,而是自行处理对领域层接口的包装。否则领域层既引入了应用层的Jar,应用层也引入了领域层的Jar,就会出现循环依赖的问题。
  2. 基础层中的数据仓储的定义也需要从领域层剥离,否则也会出现循环依赖的问题。
  3. RPC 层定义接口描述,包括:入参Req、出参Res、DTO对象,接口信息,这些内容定义出来的Jar给接口层使用,也给外部调用方使用。

如上,方便项目管理和迭代…

  1. DDD架构实现视频
  2. 抽奖DDD架构文章
  3. 项目基础手脚架

应用层

application 逻辑包装,对于抽奖流程编排、其中包括比如定时任务的编排、领域事件的消息发布和订阅

领域层

domian 封装具体的业务领域功能实现,是聚合的,充血的。 领域模型服务,是一个非常重要的模块。无论怎么做DDD的分层架构,domain 都是肯定存在的。在一层中会有一个个细分的领域服务,在每个服务包中会有【模型、仓库、服务】这样3部分。

  • 为隐藏领域层的业务逻辑实现,所有领域方法和服务等均须通过领域服务对外暴露。
  • 为实现微服务内聚合之间的解耦,原则上禁止跨聚合的领域服务调用和跨聚合的数据相互关联。

  • service 服务设计;把一些重核心业务方到 service 里实现

基础层

infrastructure 基础层依赖于 domain 领域层,因为在 domain 层定义了 【mapper】仓储接口需要在基础层实现。 本身领域层的业务实现是需要依赖,这是依赖倒置的一种设计方式。 提供基础的功能和服务,包括:提供了与数据库交互的持久层代码,Redis工具类,一些持久层对象(PO对象)。

  • repository 仓储服务;从数据库等数据源中获取数据,传递的对象可以是聚合对象、实体对象,返回的结果可以是;实体对象、值对象。因为仓储服务是由基础层(infrastructure) 引用领域层(domain),是一种依赖倒置的结构,但它可以天然的隔离PO数据库持久化对象被引用。

接口层

interfaces 用于处理用户发送的Restful请求和解析用户输入的配置文件等。 暴露对外提供抽奖的接口,抽奖的具体流程是在应用层中编排的,所以接口层会依赖应用层,会调用应用层中具体的接口实现。 微服务中引用的 RPC 需要对外提供接口的描述信息,也就是调用方在使用的时候,需要引入 Jar 包,让调用方好能依赖接口的定义做代理。 实现rpc的接口定义,引入应用层服务,封装具体的接口。(各个模块的接口)

  • 依赖关系:
  • lottery-infrastructure
  • lottery-rpc
  • lottery-domain
  • lottery-application

RPC层

-trigger (rpc层) 因为使用 RPC 框架的时候,需要对外提供描述接口信息的 Jar 让外部调用方引入才可以通过反射调用到具体的方法提供者,那么这个时候,RPC 需要暴露出来。 触发器层,一般也被叫做 adapter 适配器层。用于提供RPC接口实现、消息接收、任务执行等。所以对于这样的操作,把它叫做触发器层。 描述RPC接口文件,用于打包后外部引入POM配置。(内部互相调用的接口)

通用层

common 通用类型定义层,在我们的系统开发中,会有很多类型的定义,包括;基本的 Response、Constants 和枚举。它会被其他的层进行引用使用。

代码结构

首先是domian模块中抽奖模块的实现 抽奖系统工程采用DDD架构 + Module模块方式搭建,lottery-domain 是专门用于开发领域服务的模块,不限于目前的抽奖策略在此模块下实现还有活动领域、规则引擎、用户服务等。

strategy 是第1个在 domain 下实现的抽奖策略领域,在领域功能开发的服务下主要含有model、repository、service三块区域,其中:

  • model,用于提供vo、req、res 和 aggregates 聚合对象。
  • repository,提供仓储服务,其实也就是对Mysql、Redis等数据的统一包装。
  • service,是具体的业务领域逻辑实现层,在这个包下定义了algorithm抽奖算法实现和具体的抽奖策略包装 draw 层,对外提供抽奖接口 IDrawExec::doDrawExec() todo…

分层关系图

![[Pasted image 20240612133022.png]]

RPC框架的跑通

为什么使用dubbo?

dubbo是一款阿里自研的RPC框架

  1. 因为dubbo底层通信是Socket而不是http所以通信效率会更高
  2. dubbo有分布式高可用的设计,在某组服务宕机后会被从注册中心摘除,之后流量会打到其他的服务上 类比于openfeign
    OpenFeign是一个基于REST的远程过程调用(RPC)框架,它可帮助开发人员发起HTTP请求并调用远程服务。它可以简化与远程服务的通信,并提供了一种声明性的方式来定义客户端与服务端之间的交互。
    

    dubbo如何使用

    dubbo版本更新 注意: 目前项目使用dubbo版本为2.7.15 因为原版本dubbo项目版本已经不再维护@service注解,此时再使用广播直连模式会找不到服务端 (原代码中@Service被标记已过时)从而报错

dubbo的使用分为接口调用方以及接口提供方

接口提供方

提供接口的描述信息:接口定义以及内部方法的定义 一般在DDD结构中定义为 api:Dubbo接口定义模块。只定义接口,当然也可以包含一些定义传输的dto,返回、接收类型类(Types)。这部分结构需要对外提供jar包,也就是接口的描述信息(在另一方调用之前需要先install到本地中央仓库去)。 但是需要注意:

- 所有的 Dubbo 接口,出入参,默认都需要继承 Serializable 接口。也就是 UserReqDTO、UserResDTO、Response 这3个类,都得继承 Serializable 序列化接口。

app:应用程序启动模块。 springboot的项目启动入口 trigger:接口实现模块。 此处的接口启用是实际调用的函数/代码的地方,可以自定义实现代码,类比于service层的实现

@Slf4j
@DubboService(version = "1.0.0")
public class UserService implements IUserService {

    @Override
    public Response<UserResDTO> queryUserInfo(UserReqDTO reqDTO) {
        log.info("查询用户信息 userId: {} reqStr: {}", reqDTO.getUserId(), JSON.toJSONString(reqDTO));
        try {
            // 1. 模拟查询【你可以从数据库或者Redis缓存获取数据】
            UserResDTO resDTO = UserResDTO.builder()
                    .userId(reqDTO.getUserId())
                    .userName("小傅哥")
                    .userAge(20)
                    .build();

            // 2. 返回结果
            return Response.<UserResDTO>builder()
                    .code(Constants.ResponseCode.SUCCESS.getCode())
                    .info(Constants.ResponseCode.SUCCESS.getInfo())
                    .data(resDTO).build();
        } catch (Exception e) {
            log.error("查询用户信息失败 userId: {} reqStr: {}", reqDTO.getUserId(), JSON.toJSONString(reqDTO), e);
            return Response.<UserResDTO>builder()
                    .code(Constants.ResponseCode.UN_ERROR.getCode())
                    .info(Constants.ResponseCode.UN_ERROR.getInfo())
                    .build();
        }
    }

}

但此时为RPC调用所以service需要使用Dubbo自定义的注解

工程配置yml

application.yml(主启动类中的springboot配置方式) 项目使用dubbo的配置:

  1. 其中服务注册发现使用zookeeper的直连测试模式
  2. 包扫描要扫描到对外提供服务的jar包接口定义处 ```yml dubbo: application: name: xfg-dev-tech-dubbo version: 1.0.0 registry: address: zookeeper://127.0.0.1:2181 # N/A - 无zookeeper可配置 N/A 走直连模式测试 protocol: name: dubbo port: 20881 scan: base-packages: cn.bugstack.dev.tech.dubbo.api
或者是消费者和服务端都使用无注册中心的广播模式
```yml
# Dubbo 广播方式配置  
dubbo:  
  application:  
    name: Lottery  
    version: 1.0.0  
  registry:  
    address: N/A #multicast://224.5.6.7:1234  
  protocol:  
    name: dubbo  
    port: 8101  
  scan:  
    base-packages: com.lily.lottery.rpc

注意

  • base-packages配置的是spring约定扫描的包,dubbo的接口模块要想被扫描到,主项目的pom中应该要间接或直接的依赖接口模块。

  • spring讲究约定大于配置,Application的应用的包名应该要涵盖覆盖到其他的包名。例如Application所在包名为 com.lily.dev.dubbo 那么api接口的包名层级应该比该层级多一层或者同层级才可扫到com.lily.dev.dubbo.api。如果此时Application所在包名为 com.lily.dev.dubbo.a.b.c 那么api接口的包名将不能够被扫描到。

  • address:如果配置的是 N/A 就是不走任何注册中心,就是个直连,主要用于本地验证的。如果你配置了 zookeeper://127.0.0.1:2181 就需要先安装一个 zookeeper 。另外,即使配置了注册中心的方式,也可以直连测试。

    应用构建

    整个项目执行install,jar包就会进入本地的Maven仓库,本地调用可以使用该jar包;其次就可以通过deploy将本地的jar包发布到中央仓库,即使不在本地环境也可以使用

    服务使用方

    1. pom引入 ```xml
com.lily xxx-dubbo-api 1.0-SNAPSHOT
2. 消费配置
使用方也需要配置对应的dubbo属性
```yml
dubbo:
  application:
    name: xfg-dev-tech-dubbo
    version: 1.0.0
  registry:
     address: zookeeper://127.0.0.1:2181
#    address: N/A
  protocol:
    name: dubbo
    port: 20881
  1. 代码调用 ```java // 直连模式;@DubboReference(interfaceClass = IUserService.class, url = “dubbo://127.0.0.1:20881”, version = “1.0.0”) @DubboReference(interfaceClass = IUserService.class, version = “1.0.0”) private IUserService userService;

@Test public void test_userService() { UserReqDTO reqDTO = UserReqDTO.builder().userId(“10001”).build(); Response resDTO = userService.queryUserInfo(reqDTO); log.info("测试结果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO)); }


- 配置了 zookeeper 就用第一个,代码中对应 `@DubboReference(interfaceClass = IUserService.class, version = "1.0.0")`
- 配置了 N/A 就用第二个,代码中必须指定直连。`@DubboReference(interfaceClass = IUserService.class, url = "dubbo://127.0.0.1:20881", version = "1.0.0")`

# 抽奖整体业务设计

## 业务流程
#### 流程编排
[抽奖流程编排](https://gitcode.net/KnowledgePlanet/Lottery/-/wikis/%E7%AC%AC-2-%E9%83%A8%E5%88%86-%E9%A2%86%E5%9F%9F%E5%BC%80%E5%8F%91//%E7%AC%AC12%E8%8A%82%EF%BC%9A%E5%9C%A8%E5%BA%94%E7%94%A8%E5%B1%82%E7%BC%96%E6%8E%92%E6%8A%BD%E5%A5%96%E8%BF%87%E7%A8%8B)
- 抽奖整个活动过程的流程编排,主要包括:**用户对活动的参与(领取)、对抽奖的操作、对中奖结果的存放,以及如何处理发奖,对于发奖流程我们设计为MQ触发。**
- 对于每一个流程节点编排的内容,都是在领域层开发完成的,而应用层只是做最为简单的且很薄的一层。_其实这块也很符合目前很多低代码的使用场景,通过界面可视化控制流程编排,生成代码_

业务流程图:
![[Pasted image 20241012212947.png]]

#### 多表数据幂等性考虑

表结构:
1. 分别在两个分库的表 lottery_01.user_take_activity(用户参与活动记录表)、lottery_02.user_take_activity 中添加 **state**`【活动单使用状态 0未使用、1已使用】` 状态字段,**这个状态字段用于写入中奖信息到 user_strategy_export_000~003 (用户中奖订单表)表中时候,两种表可以做一个幂等性的事务**。
2. 同时还需要加入 strategy_id 策略ID字段,用于**处理领取了活动单但执行抽奖失败时,可以继续获取到此抽奖单继续执行抽奖,而不需要重新领取活动**。_其实领取活动就像是一种活动镜像信息,可以在控制幂等反复使用_

代码层面:
1. `this.grabActivity()` **方法【用户参与活动方法:包括修改`user_take_activity,user_take_activity_count`两表数据】**,用户领取活动时候,新增记录:strategy_id、state 两个字段,这两个字段就是为了处理用户对领取镜像记录的二次处理未执行抽奖的领取单,以及state状态控制事务操作的幂等性。
## 抽奖策略算法

**单体概率、总体概率两种**抽奖算法描述:
场景A20%、B30%、C50%
- **总体概率**:如果A奖品抽空后,B和C奖品的概率按照 `3:5` 均分,相当于B奖品中奖概率由 `0.3` 升为 `0.375`。
	- 是基于抽奖概率在改变的中奖方式,为了保证所有参与抽奖的用户抽取奖品的概率都相等,而不是根据秒杀速度决定的,需要实时改变【**奖品概率分布的hash数组**】,保证所有剩下的奖品中奖概率和维持在100%。
- **单项概率**:如果A奖品抽空后,B和C保持目前中奖概率,用户抽奖扔有20%中为A,因A库存抽空则结果展示为未中奖。为了运营成本,通常这种情况的使用的比较多。
	- 是基于奖品获奖概率不变的抽奖方式,所有奖品的中奖概率,奖品概率分布的hash数组不变,基于秒杀特性决定,所有用户同时并发抽奖。

### 斐波那契算法
用于初始化中奖hash数组
todo
### 总体概率抽奖
- 首先要从总的中奖列表中排除掉那些被排除掉的奖品,这些奖品会涉及到概率的值重新计算。
- 如果排除后剩下的奖品列表小于等于1,则可以直接返回对应信息
- 接下来就使用随机数工具生产一个100内的随值与奖品列表中的值进行循环比对,算法时间复杂度O(n)
代码
```java
@Component("defaultRandomDrawAlgorithm")  
public class DefaultRandomDrawAlgorithm extends BaseAlgorithm {  
    //  
    /**  
     * 随机抽奖  
     * 父类中未实现的抽象方法  
     * @param strategyId 策略ID  
     * @param excludeAwardIds 已经被抽完的奖品ID列表  
     *                        不用于构建概率元组的奖品信息  
     * @return 奖品ID  
     */    @Override  
    public String randomDraw(Long strategyId, List<String> excludeAwardIds) {  
  
        /**  
         * 排除已经被抽完的奖品ID,保存剩下的还可以抽奖的ID  
         * 若全部奖品都被抽完了,则直接返回空字符串,说明没有获奖  
         * 如果只剩下一个奖品了,那么之间获得该奖品  
         * 若有多个排除奖品,则新建抽奖规则随机抽取一个奖品  
         * todo: 优化  
         */  
        // 初始化差异分母  
        BigDecimal differenceDenominator = BigDecimal.ZERO;  
        // 初始化非抽奖奖品列表  
        List<AwardRateInfo> differenceAwardRateList = new ArrayList<>();  
        // 获取概率元组中所有的奖品种类  
        List<AwardRateInfo> awardRateInfos = awardRateInfoMap.get(strategyId);  
        // 遍历所有奖品种类  
        for (AwardRateInfo awardRateInfo : awardRateInfos) {  
            // 获取奖品ID  
            String awardId = awardRateInfo.getAwardId();  
            // 如果奖品还没有被抽完则加入差异奖品列表  
            if (!excludeAwardIds.contains(awardId)) {  
                differenceAwardRateList.add(awardRateInfo);  
                differenceDenominator = differenceDenominator.add(awardRateInfo.getAwardRate());  
            }  
        }  
        // 如果差异奖品列表为空则返回空字符串  
        if (differenceAwardRateList.isEmpty()) return "";  
        // 如果差异奖品列表只有一个则返回该奖品ID  
        if (differenceAwardRateList.size() == 1) return differenceAwardRateList.get(0).getAwardId();  
        /**  
         * 新建规则重新分配概率抽奖  
         * 从差异奖品列表中随机抽取奖品  
         */  
        // 获取随机概率值为0-100的随机数  
        SecureRandom secureRandom = new SecureRandom();  
        int randomVal = secureRandom.nextInt(100) + 1;  
        // 循环获取奖品  
        String awardId = "";  
        int cursorVal = 0;  
        for (AwardRateInfo awardRateInfo : differenceAwardRateList) {  
            // 计算奖品概率值  
            // Divide the award rate by the difference denominator, multiply by 100, and round up to the nearest integer  
            int rateVal = awardRateInfo.getAwardRate().divide(differenceDenominator, 2, BigDecimal.ROUND_UP).multiply(new BigDecimal(100)).intValue();  
            // 如果随机数小于等于当前奖品概率值则返回该奖品ID  
            if (randomVal <= (cursorVal + rateVal)) {  
                awardId = awardRateInfo.getAwardId();  
                break;  
            }  
            cursorVal += rateVal;  
        }  
        // 抽中当前奖品ID  
        return awardId;  
    }  
}****

单项概率抽奖

算法描述:单项概率算法不涉及奖品概率重新计算的问题,分配好的概率结果是可以固定下来的 主要是直接获取和查询之前的概率元组

/**  
 * Created by lily via on 2024/6/15 14:59 * 单次抽奖随机抽奖算法实现  
 * 策略模式与工厂模式结合  
 *  
 * 场景A20%、B30%、C50%  
 * 总体概率:如果A奖品抽空后,B和C奖品的概率按照 3:5 均分,  
 * 相当于B奖品中奖概率由 0.3 升为 0.375  
 */@Component("singleRateRandomDrawAlgorithm")  
//@Primary  
public class SingleRateRandomDrawAlgorithm extends BaseAlgorithm {  
  
    /**  
     * @description: 单项随机概率抽奖  
     * @author Lily Via  
     * @date 2024/6/15 17:38  
     * @version 1.0  
     */    @Override  
    public String randomDraw(Long strategyId, List<String> excludeAwardIds) {  
        // 获取策略对应的元祖  
        String[] rateTuple = rateTupleMap.get(strategyId);  
        // todo: 校验rateTuple不为空  
        assert rateTuple != null;  
        // 生成随机索引  
        int randomVal = new SecureRandom().nextInt(100) + 1;  
        // 转换为斐波那契散列索引  
        int idx = hashIdx(randomVal);  
        // 查询是否中奖  
        String awardId = rateTuple[idx];  
        // 如果抽中奖品在排除奖品列表中则返回未中奖(说明这个奖品已经抽完)  
        if (excludeAwardIds.contains(awardId)) {  
            return "未中奖";  
        }  
        return awardId;  
    }  
}

项目完善

子模块依赖配置


现有工程模块依赖关系:

  • lottery-application,应用层,引用:domain
  • lottery-common,通用包,引用:
  • lottery-domain,领域层,引用:infrastructure
  • lottery-infrastructure,基础层,引用:
  • lottery-interfaces,接口层,引用:applicationrpc
  • lottery-rpc,RPC接口定义层,引用:common

此时项目着重配置以下三个模块

  • lottery-infrastructure
  • lottery-interfaces
  • lottery-rpc

    interfaces接口模块

    模块结构

    定义了主启动类,由于该模块是整个项目的出口。

  • 该模块实现了RPC层定义的接口
  • 在配置文件中定义mybatis配置(在接口实现类中会通过mybatis访问数据仓),项目启动端口号,以及dubbo对应配置
  • 对应的需要mybatis的 mapper配置文件 ![[Pasted image 20240613210022.png]] dubbo对应的yml配置
    # Dubbo 广播方式配置
    dubbo:
    application:
      name: Lottery
      version: 1.0.0
    registry:
      address: N/A #multicast://224.5.6.7:1234
    protocol:
      name: dubbo
      port: 20880
    scan:
      base-packages: com.lily.lottery.rpc
    
  • 广播模式的配置唯一区别在于注册地址,registry.address = multicast://224.5.6.7:1234(此时为注册地址),服务提供者和服务调用者都需要配置相同的📢广播地址。或者配置为 N/A 用于直连模式使用
  • application,配置应用名称和版本
  • protocol,配置的通信协议和端口
  • scan,相当于 Spring 中自动扫描包的地址,可以把此包下的所有 rpc 接口都注册到服务中
    pom文件

    lottery-interfaces 作为系统的 war 包工程,在构建工程时候需要依赖于 POM 中配置的相关信息。那这里就需要注意下,作为 Lottery 工程下的主 pom.xml 需要完成对 SpringBoot 父文件的依赖,此外还需要定义一些用于其他模块可以引入的配置信息,比如:jdk版本、编码方式等。而其他层在依赖于工程总 pom.xml 后还需要配置自己的信息。

  • lottery-interfaces 是整个程序的出口,也是用于构建 War 包的工程模块,所以你会看到一个 <packaging>war</packaging> 的配置。
  • 在 dependencies 会包含所有需要用到的 SpringBoot 配置,也会包括对其他各个模块的引入。
  • 在 build 构建配置上还会看到一些关于测试包的处理,比如这里包括了资源的引入也可以包括构建时候跳过测试包的配置。 ```xml
lottery-interfaces war org.springframework.boot spring-boot-starter-web ... Lottery src/main/resources true **/** src/test/resources true **/** org.springframework.boot spring-boot-maven-plugin org.apache.maven.plugins maven-compiler-plugin 8</source> 8
#### RPC模块
##### 模块结构
该模块是消费者调用需要依赖的模块,其中定义了一个活动类接口以及对应的传输时的包装类
模块.项目结构如下:
![[Pasted image 20240613205636.png]]
##### pom文件
```xml
<parent>
    <artifactId>Lottery</artifactId>
    <groupId>cn.itedus.lottery</groupId>
    <version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lottery-rpc</artifactId>
<packaging>jar</packaging>
<dependencies>
    <dependency>
        <groupId>cn.itedus.lottery</groupId>
        <artifactId>lottery-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>
<build>
    <finalName>lottery-rpc</finalName>
    <plugins>
        <!-- 编译plugin -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>${jdk.version}</source>
                <target>${jdk.version}</target>
                <compilerVersion>1.8</compilerVersion>
            </configuration>
        </plugin>
    </plugins>
</build>

定义开发 RPC 接口

开发接口

@DubboService
public class ActivityBooth implements IActivityBooth {

    @Resource
    private IActivityDao activityDao;

    @Override
    public ActivityRes queryActivityById(ActivityReq req) {

        Activity activity = activityDao.queryActivityById(req.getActivityId());

        ActivityDto activityDto = new ActivityDto();
        activityDto.setActivityId(activity.getActivityId());
        activityDto.setActivityName(activity.getActivityName());
        activityDto.setActivityDesc(activity.getActivityDesc());
        activityDto.setBeginDateTime(activity.getBeginDateTime());
        activityDto.setEndDateTime(activity.getEndDateTime());
        activityDto.setStockAllTotal(activity.getStockAllTotal());
        activityDto.setStockDayTotal(activity.getStockDayTotal());
        activityDto.setTakeAllCount(activity.getStockAllTotal());
        activityDto.setTakeDayCount(activity.getStockDayTotal());

        return new ActivityRes(new Result(Constants.ResponseCode.SUCCESS.getCode(), Constants.ResponseCode.SUCCESS.getInfo()), activityDto);
    }

}
  • 用于实现 RPC 接口的实现类 ActivityBooth 上有一个注解 @DubboService,这个注解是来自于 Dubbo 的 org.apache.dubbo.config.annotation.Service,也就是这个包下含有此注解配置的类可以被 Dubbo 管理。
  • 在 queryActivityById 功能实现中目前还比较粗糙,但大体可以看出这是对数据库的操作以及对结果的封装,提供 DTO 的对象并返回 Res 结果。目前dto的创建后续可以使用门面模式和工具类进行处理

    抽奖活动库表设计

需要哪些库表?

  • 活动配置,activity:提供活动的基本配置
  • 策略配置,strategy:用于配置抽奖策略,概率、玩法、库存、奖品
  • 策略明细,strategy_detail:抽奖策略的具体明细配置
  • 奖品配置,award:用于配置具体可以得到的奖品
  • 用户参与活动记录表,user_take_activity:每个用户参与活动都会记录下他的参与信息,时间、次数
  • 用户活动参与次数表,user_take_activity_count:用于记录当前参与了多少次
  • 用户策略计算结果表,user_strategy_export_001~004:最终策略结果的一个记录,也就是奖品中奖信息的内容

抽奖活动表

用于测试简单的rpc流程, 但是注意,插入数据时会发生乱码问题。 活动表:是一个用于配置抽奖活动的总表,用于存放活动信息,包括:ID、名称、描述、时间、库存、参与次数等。

CREATE TABLE `activity` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
  `activity_id` bigint(20) NOT NULL COMMENT '活动ID',
  `activity_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '活动名称',
  `activity_desc` varchar(128) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '活动描述',
  `begin_date_time` datetime DEFAULT NULL COMMENT '开始时间',
  `end_date_time` datetime DEFAULT NULL COMMENT '结束时间',
  `stock_count` int(11) DEFAULT NULL COMMENT '库存',
  `take_count` int(11) DEFAULT NULL COMMENT '每人可参与次数',
  `state` tinyint(2) DEFAULT NULL COMMENT '活动状态:1编辑、2提审、3撤审、4通过、5运行(审核通过后worker扫描状态)、6拒绝、7关闭、8开启',
  `creator` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '创建人',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `unique_activity_id` (`activity_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='活动配置';

1. lottery.sql

活动信息表

活动库存可以理解为风控的库存,也就是参与此次抽奖项目能够提供的所有奖品的数量。

create database lottery;

-- auto-generated definition
create table activity
(
    id            bigint auto_increment comment '自增ID',
    activityId    bigint       null comment '活动ID',
    activityName  varchar(64)  not null comment '活动名称',
    activityDesc  varchar(128) null comment '活动描述',
    beginDateTime datetime     not null comment '开始时间',
    endDateTime   datetime     not null comment '结束时间',
    stockCount    int          not null comment '库存',
    takeCount     int          null comment '每人可参与次数',
    state         int          null comment '活动状态:编辑、提审、撤审、通过、运行、拒绝、关闭、开启',
    creator       varchar(64)  not null comment '创建人',
    createTime    datetime     not null comment '创建时间',
    updateTime    datetime     not null comment '修改时间',
    constraint activity_id_uindex
        unique (id)
)
    comment '活动配置';

alter table activity
    add primary key (id);

奖品信息表

-- auto-generated definition
create table award
(
    id           bigint(11) auto_increment comment '自增ID'
        primary key,
    awardId      bigint                             null comment '奖品ID',
    awardType    int(4)                             null comment '奖品类型(文字描述、兑换码、优惠券、实物奖品暂无)',
    awardCount   int                                null comment '奖品数量',
    awardName    varchar(64)                        null comment '奖品名称',
    awardContent varchar(128)                       null comment '奖品内容「文字描述、Key、码」',
    createTime   datetime default CURRENT_TIMESTAMP null comment '创建时间',
    updateTime   datetime default CURRENT_TIMESTAMP null comment 'updateTime'
)
    comment '奖品配置';

配置策略表

抽奖活动的具体细节配置,细节id,此次抽奖使用哪一种算法,抽奖、发奖方式(即使、定时)、发奖时间

-- auto-generated definition
create table strategy
(
    id           bigint(11) auto_increment comment '自增ID'
        primary key,
    strategyId   bigint(11)   not null comment '策略ID',
    strategyDesc varchar(128) null comment '策略描述',
    strategyMode int(4)       null comment '策略方式「1:单项概率、2:总体概率」',
    grantType    int(4)       null comment '发放奖品方式「1:即时、2:定时[含活动结束]、3:人工」',
    grantDate    datetime     null comment '发放奖品时间',
    extInfo      varchar(128) null comment '扩展信息',
    createTime   datetime     null comment '创建时间',
    updateTime   datetime     null comment '修改时间',
    constraint strategy_strategyId_uindex
        unique (strategyId)
)
    comment '策略配置';

-- auto-generated definition

抽奖活动详情表

描述的是一个活动中、策略用什么、奖品有哪些、每个奖品的中奖概率分别有什么

create table strategy_detail  
(  
    id                bigint(11) auto_increment comment '自增ID'  
        primary key,  
    strategyId        bigint(11)    not null comment '策略ID',  
    awardId           bigint(11)    null comment '奖品ID',  
    awardCount        int           null comment '奖品数量',  
    awardRate         decimal(5, 2) null comment '中奖概率',  
    createTime        datetime      null comment '创建时间',  
    updateTime        datetime      null comment '修改时间',  
    awardSurplusCount int default 0 null comment '奖品剩余库存'  
)  
    comment '抽奖情况表';

lottery_01.sql

  1. lottery_01.sql ~ lottery_02.sql 每个用户参与抽奖活动后都

    用户参与活动记录表

    ```sql create database lottery_01;

DROP TABLE IF EXISTS user_take_activity;
CREATE TABLE user_take_activity (
id bigint(20) NOT NULL AUTO_INCREMENT COMMENT ‘自增ID’,
u_id varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT ‘用户ID’,
take_id bigint(20) NULL DEFAULT NULL COMMENT ‘活动领取ID’,
activity_id bigint(20) NULL DEFAULT NULL COMMENT ‘活动ID’,
activity_name varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT ‘活动名称’,
take_date datetime(0) NULL DEFAULT NULL COMMENT ‘活动领取时间’,
take_count int(11) NULL DEFAULT NULL COMMENT ‘领取次数’,
strategy_id int(11) NULL DEFAULT NULL COMMENT ‘抽奖策略ID’,
state tinyint(4) NULL DEFAULT NULL COMMENT ‘活动单使用状态 0未使用、1已使用’,
uuid varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT ‘防重ID’,
create_time datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT ‘创建时间’,
update_time datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT ‘更新时间’,
PRIMARY KEY (id) USING BTREE,
UNIQUE INDEX idx_uuid(uuid) USING BTREE COMMENT ‘防重ID’
) ENGINE = InnoDB AUTO_INCREMENT = 32 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin COMMENT = ‘用户参与活动记录表’ ROW_FORMAT = Dynamic;


#### 用户中奖结果表
mq_state
```sql

-- auto-generated definition
DROP TABLE IF EXISTS `user_strategy_export_001`;  
CREATE TABLE `user_strategy_export_001`  (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',  
  `u_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '用户ID',  
  `activity_id` bigint(20) NULL DEFAULT NULL COMMENT '活动ID',  
  `order_id` bigint(20) NULL DEFAULT NULL COMMENT '订单ID',  
  `strategy_id` bigint(20) NULL DEFAULT NULL COMMENT '策略ID',  
  `strategy_mode` tinyint(4) NULL DEFAULT NULL COMMENT '策略方式(1:单项概率、2:总体概率)',  
  `grant_type` tinyint(4) NULL DEFAULT NULL COMMENT '发放奖品方式(1:即时、2:定时[含活动结束]、3:人工)',  
  `grant_date` datetime(0) NULL DEFAULT NULL COMMENT '发奖时间',  
  `grant_state` tinyint(4) NULL DEFAULT NULL COMMENT '发奖状态',  
  `award_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '发奖ID',  
  `award_type` tinyint(4) NULL DEFAULT NULL COMMENT '奖品类型(1:文字描述、2:兑换码、3:优惠券、4:实物奖品)',  
  `award_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '奖品名称',  
  `award_content` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '奖品内容「文字描述、Key、码」',  
  `uuid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '防重ID',  
  `mq_state` tinyint(4) NULL DEFAULT NULL COMMENT '消息发送状态(0未发送、1发送成功、2发送失败)',  
  `create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',  
  `update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '更新时间',  
  PRIMARY KEY (`id`) USING BTREE,  
  UNIQUE INDEX `idx_uuid`(`uuid`) USING BTREE  
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin COMMENT = '用户策略计算结果表' ROW_FORMAT = Dynamic;


DROP TABLE IF EXISTS `user_take_activity_count`;  
CREATE TABLE `user_take_activity_count`  (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',  
  `u_id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '用户ID',  
  `activity_id` bigint(20) NULL DEFAULT NULL COMMENT '活动ID',  
  `total_count` int(11) NULL DEFAULT NULL COMMENT '可领取次数',  
  `left_count` int(11) NULL DEFAULT NULL COMMENT '已领取次数',  
  `create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',  
  `update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '更新时间',  
  PRIMARY KEY (`id`) USING BTREE,  
  UNIQUE INDEX `idx_uId_activityId`(`u_id`, `activity_id`) USING BTREE  
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin COMMENT = '用户活动参与次数表' ROW_FORMAT = Dynamic;


  1. 这些库表是用于支撑起抽奖系统开发的必备表,后续可能会随着功能的开发做适当的调整。
  2. 接下来我们会围绕这些库表一点点实现各个领域的功能,包括:抽奖策略领域、奖品发放领域、活动信息领域等

简答题

表结构是怎么设计的: 活动配置表:活动ID,开始、结束时间,活动库存,活动状态,活动策略

抽奖策略配置表:策略ID,策略描述,抽奖算法,发奖方式,发奖时间,奖品编号,奖品库存

用户表:参与活动记录表,活动ID,参与时间,可参与的次数等。

用户抽奖计算结果表:活动ID,是否中奖,奖品ID,发奖状态,发奖时间等。

# 项目设计模式

模板、工厂、策略模式使用在哪些地方?

工厂模式

工厂模式:提供一个接口或抽象类用于创建对象,允许子类去决定实例化哪一个类。 实现发奖: 首先定义一个抽象的奖品类,然后每种奖品设计自己的实现类,创建一个配置类,把所有奖品类的实现封装到map中,定义一个工厂类,通过map.get()把发货奖品的实现交给子类去实现,减少if else的使用,并且,定义了新的奖品类之后也可以直接加入map,能够减少代码改动。

模板模式

模板模式: 核心概念是在抽象类中定义一个算法的实现框架,具体的实现步骤延迟到子类去实现,重点是固定一套算法流程,不改变实现方式。定义了参与活动、抽奖、发奖的整体流程。首先定义一个抽象类,这个抽象类中包含了一个模板方法,可以用final关键字修饰,他是一个具体的方法,定义了整个过程的流程和步骤,除此之外,还会定义抽象步骤方法,具体的实现在子类中进行实现。

使用场景/编排流程:

public abstract class BaseActivityPartake extends ActivityPartakeSupport implements IActivityPartake {  
  
    @Resource  
    private Map<Constants.Ids, IIdGenerator> idGeneratorMap;  
  
    @Override  
    public PartakeResult doPartake(PartakeReq req) {  
  
        // 1. 查询是否存在未执行抽奖领取活动单【user_take_activity 存在 state = 0,领取了但抽奖过程失败的,可以直接返回领取结果继续抽奖】  
        UserTakeActivityVO userTakeActivityVO = this.queryNoConsumedTakeActivityOrder(req.getActivityId(), req.getuId());  
        if (null != userTakeActivityVO) {  
            return buildPartakeResult(userTakeActivityVO.getStrategyId(), userTakeActivityVO.getTakeId(), Constants.ResponseCode.NOT_CONSUMED_TAKE);  
        }  
  
        // 2. 查询活动账单  
        ActivityBillVO activityBillVO = super.queryActivityBill(req);  
  
        // 3. 活动信息校验处理【活动库存、状态、日期、个人参与次数】  
        Result checkResult = this.checkActivityBill(req, activityBillVO);  
        if (!Constants.ResponseCode.SUCCESS.getCode().equals(checkResult.getCode())) {  
            return new PartakeResult(checkResult.getCode(), checkResult.getInfo());  
        }  
  
        // 4. 扣减活动库存,通过Redis【活动库存扣减编号,作为锁的Key,缩小颗粒度】 Begin        StockResult subtractionActivityResult = this.subtractionActivityStockByRedis(req.getuId(), req.getActivityId(), activityBillVO.getStockCount(), activityBillVO.getEndDateTime());  
  
        if (!Constants.ResponseCode.SUCCESS.getCode().equals(subtractionActivityResult.getCode())) {  
            this.recoverActivityCacheStockByRedis(req.getActivityId(), subtractionActivityResult.getStockKey(), subtractionActivityResult.getCode());  
            return new PartakeResult(subtractionActivityResult.getCode(), subtractionActivityResult.getInfo());  
        }  
  
        // 5. 插入领取活动信息【个人用户把活动信息写入到用户表】  
        Long takeId = idGeneratorMap.get(Constants.Ids.SnowFlake).nextId();  
        Result grabResult = this.grabActivity(req, activityBillVO, takeId);  
        if (!Constants.ResponseCode.SUCCESS.getCode().equals(grabResult.getCode())) {  
            this.recoverActivityCacheStockByRedis(req.getActivityId(), subtractionActivityResult.getStockKey(), grabResult.getCode());  
            return new PartakeResult(grabResult.getCode(), grabResult.getInfo());  
        }  
  
        // 6. 扣减活动库存,通过Redis End  
        this.recoverActivityCacheStockByRedis(req.getActivityId(), subtractionActivityResult.getStockKey(), Constants.ResponseCode.SUCCESS.getCode());  
  
        return buildPartakeResult(activityBillVO.getStrategyId(), takeId, activityBillVO.getStockCount(), subtractionActivityResult.getStockSurplusCount(), Constants.ResponseCode.SUCCESS);  
    }

用户领取活动 | 执行抽奖 使用不同抽奖算法,那么抽奖过程的细节也是不同的 | 结果落库 | 发送MQ触发发奖流程(异步实现) | 返回结果


策略模式

策略模式:实现多种算法,主要是指将每个算法都封装起来,并且使他们之间在使用时可以互换。定义一个抽奖策略的实现接口,每种抽奖算法实现该接口形成对应的抽奖类,根据运行中传递的参数选择对应的实现方式,可以将所有策略的实现方式封装在map中,这样可以更加灵活的扩展。它的特点就是将策略的实现和对策略的选择分开。

状态模式

抽奖活动审核,状态流转编排 定义很多流程状态,状态对应的对象我们会做成具体实现类,每个对象对这些编排的状态都有自身流转的行为定义。

组合模式

参考文章组合模式

  1. 组合模式是一种结构型设计模式, 你可以使用它将相似对象组合成类树状结构, 并且能像使用独立对象一样使用它们。
  2. 组合图形Compound­Graphic是一个容器, 它可以由多个包括容器在内的子图形构成。 组合图形与简单图形拥有相同的方法。 但是, 组合图形自身并不完成具体工作, 而是**将请求递归地传递给自己的子项目, 然后 “汇总” 结果。

分库分表路由实现

sharding-jdbc使用

与项目代码结合实践,文章中组件使用部分已整理笔记 [[2024-10-11-分库分表组件]]

事务管理

实际上由于spring本身也提供了项目的事务管理功能,添加@Transactional注解默认开启。但是由于sharding-jdbc提供了 分布式事务,为了提高其可扩展性我们优先选择sharding-jdbc的事务管理。 详情见:sharding-jdbc事务管理器详解:[[2024-10-11-分库分表组件]]

  1. 本地事务的开启仅需加注解 @Transactional即可,@ShardingTransaction(TransactionType.LOCAL)是选择sharding的分布式事务类型,此注解不添加的化会默认使用本地事务。
  2. 使用分布式事务选用TransactionType.XA注解即可

    业务流程

    在参与活动环节中我们通过模板模式定义了三个流程:

  3. 校验本次活动是否可以参与(活动信息配置生成存储在ActivityBillVO bill对象中)
    1. 校验:活动状态
    2. 校验:活动日期
    3. 校验:活动库存
    4. 校验:个人库存 - 个人活动剩余可领取次数
  4. 扣减活动表可参与活动次数
    1. 扣减活动库存【目前为直接对配置库中的 lottery.activity 直接操作表扣减库存,后续优化为Redis扣减】![[Pasted image 20241012195940.png]]
  5. 在用户参与活动表新增用户参与活动记录(grabActivity)
    1. 在用户个人参与活动记录表中 扣减个人可参与次数![[Pasted image 20241012200156.png]]
    2. 在用户活动关联表中 插入用户参与活动信息![[Pasted image 20241012200312.png]]

为保证活动参与表与用户参与活动表数据的一致性,所以我们选择对grabActivity方法添加注解式事务管理。

业务实现

由于涉及到事务管理的两个表(user_take_activity、user_take_activity_count)只进行了分库并未进行分表(lottery01),因此选用本地事务管理器即可。 【因为local本地事务管理完全支持非跨库事务,例如:仅分表,或分库但是路由的结果在单库中。】 ![[Pasted image 20241012204447.png]] 在需要事务的方法中添加相关注解即可,例如:

@ShardingTransactionType(TransactionType.LOCAL)
@Transactional

完整代码:

@ShardingTransactionType(TransactionType.LOCAL) 
@Transactional
@Override  
protected Result grabActivity(PartakeReq partake, ActivityBillVO bill) {
                // 扣减个人已参与次数  
                int updateCount = userTakeActivityRepository.subtractionLeftCount(bill.getActivityId(), bill.getActivityName(), bill.getTakeCount(), bill.getUserTakeLeftCount(), partake.getuId(), partake.getPartakeDate());  
                if (0 == updateCount) {  
                    status.setRollbackOnly();  
                    logger.error("领取活动,扣减个人已参与次数失败 activityId:{} uId:{}", partake.getActivityId(), partake.getuId());  
                    return Result.buildResult(Constants.ResponseCode.NO_UPDATE);  
                }  
  
                // 插入领取活动信息  
                Long takeId = idGeneratorMap.get(Constants.Ids.SnowFlake).nextId();  
                userTakeActivityRepository.takeActivity(bill.getActivityId(), bill.getActivityName(), bill.getTakeCount(), bill.getUserTakeLeftCount(), partake.getuId(), partake.getPartakeDate(), takeId);  
            } catch (DuplicateKeyException e) {  
                status.setRollbackOnly();  
                logger.error("领取活动,唯一索引冲突 activityId:{} uId:{}", partake.getActivityId(), partake.getuId(), e);  
                return Result.buildResult(Constants.ResponseCode.INDEX_DUP);  
            }  
            return Result.buildSuccessResult();  
}

验证结果

准备数据: ![[Pasted image 20241012212227.png]] ![[Pasted image 20241012212244.png]] 结果验证: ![[Pasted image 20241012212315.png]]

场景问题

规则引擎——决策树

决策树简介

除了普通抽奖以外,项目还提供了根据不同人群画像从规则引擎树中查找并匹配到合适该用户抽奖的活动id,再参与抽奖方式。因此我们采用组合模式把不同的决策条件和决策细节都封装到树形结构中。

规则引擎的核心算法原理包括决策树构建、决策树执行、规则匹配、规则执行。 规则引擎分为 规则管理器 + 推理引擎。

组合模式: 参考文章组合模式

  1. 组合模式是一种结构型设计模式, 你可以使用它将相似对象组合成类树状结构, 并且能像使用独立对象一样使用它们。
  2. 组合图形Compound­Graphic是一个容器, 它可以由多个包括容器在内的子图形构成。 组合图形与简单图形拥有相同的方法。 但是, 组合图形自身并不完成具体工作, 而是将请求递归地传递给自己的子项目, 然后 “汇总” 结果。

规则树需要存储Mysql数据库中,因为放在数据库可以进行动态化配置,无需大量改动代码,只需要新增对应的规则引擎即可(filter)。

  1. 首先我们需要一个表保存规则树的结构
    • 例如根据性别和年龄划分的规则树,需要先根据性别筛选人群,再根据年龄范围筛选某类人群可以参与哪个活动
  2. 还需要一张表用于记录所有从根节点到果实节点的一条可量化筛选人群的规则链路,即这颗规则树的全部从根节点到叶子节点的路径
    • 例如某个用户画像【男,26岁】,会根据过滤器进入性别男、年龄>=25岁、参与id=xxx活动的链路中。
  3. 又因为不同的规则树只能保存一组不同条件的用户筛选逻辑,所以还需要一个表用于记录所有种类的规则树。
    • 例如抽奖活动需要根据年龄、性别画像组成的规则树;也需要根据下单量、性别组成规则树。

![[Pasted image 20241014165522.png]]

库表设计

  1. 创建表rule_tree用于保存所有不同类型的规则树,方便拓展不同的过滤规则
  2. 创建表rule_tree_node用于定义规则树的树形结构
  3. 创建表rule_tree_node_line用于保存(同一)规则树的所有过滤链路
-- ----------------------------  
-- Table structure for rule_tree  
-- ----------------------------  
DROP TABLE IF EXISTS `rule_tree`;  
CREATE TABLE `rule_tree` (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',  
  `tree_name` varchar(64) DEFAULT NULL COMMENT '规则树Id',  
  `tree_desc` varchar(128) DEFAULT NULL COMMENT '规则树描述',  
  `tree_root_node_id` bigint(20) DEFAULT NULL COMMENT '规则树根ID',  
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',  
  PRIMARY KEY (`id`)  
) ENGINE=InnoDB AUTO_INCREMENT=10002 DEFAULT CHARSET=utf8;  
  
-- ----------------------------  
-- Records of rule_tree  
-- ----------------------------  
BEGIN;  
INSERT INTO `rule_tree` VALUES (2110081902, '抽奖活动规则树', '用于决策不同用户可参与的活动', 1, '2021-10-08 15:38:05', '2021-10-08 15:38:05');  
COMMIT;  
  
-- ----------------------------  
-- Table structure for rule_tree_node  
-- ----------------------------  
DROP TABLE IF EXISTS `rule_tree_node`;  
CREATE TABLE `rule_tree_node` (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',  
  `tree_id` int(2) DEFAULT NULL COMMENT '规则树ID',  
  `node_type` int(2) DEFAULT NULL COMMENT '节点类型;1子叶、2果实',  
  `node_value` varchar(32) DEFAULT NULL COMMENT '节点值[nodeType=2];果实值',  
  `rule_key` varchar(16) DEFAULT NULL COMMENT '规则Key',  
  `rule_desc` varchar(32) DEFAULT NULL COMMENT '规则描述',  
  PRIMARY KEY (`id`)  
) ENGINE=InnoDB AUTO_INCREMENT=123 DEFAULT CHARSET=utf8;  
  
-- ----------------------------  
-- Records of rule_tree_node  
-- ----------------------------  
BEGIN;  
INSERT INTO `rule_tree_node` VALUES (1, 2110081902, 1, NULL, 'userGender', '用户性别[男/女]');  
INSERT INTO `rule_tree_node` VALUES (11, 2110081902, 1, NULL, 'userAge', '用户年龄');  
INSERT INTO `rule_tree_node` VALUES (12, 2110081902, 1, NULL, 'userAge', '用户年龄');  
INSERT INTO `rule_tree_node` VALUES (111, 2110081902, 2, '100001', NULL, NULL);  
INSERT INTO `rule_tree_node` VALUES (112, 2110081902, 2, '100002', NULL, NULL);  
INSERT INTO `rule_tree_node` VALUES (121, 2110081902, 2, '100003', NULL, NULL);  
INSERT INTO `rule_tree_node` VALUES (122, 2110081902, 2, '100004', NULL, NULL);  
COMMIT;  
  
-- ----------------------------  
-- Table structure for rule_tree_node_line  
-- ----------------------------  
DROP TABLE IF EXISTS `rule_tree_node_line`;  
CREATE TABLE `rule_tree_node_line` (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',  
  `tree_id` bigint(20) DEFAULT NULL COMMENT '规则树ID',  
  `node_id_from` bigint(20) DEFAULT NULL COMMENT '节点From',  
  `node_id_to` bigint(20) DEFAULT NULL COMMENT '节点To',  
  `rule_limit_type` int(2) DEFAULT NULL COMMENT '限定类型;1:=;2:>;3:<;4:>=;5<=;6:enum[枚举范围];7:果实',  
  `rule_limit_value` varchar(32) DEFAULT NULL COMMENT '限定值',  
  PRIMARY KEY (`id`)  
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;  
  
-- ----------------------------  
-- Records of rule_tree_node_line  
-- ----------------------------  
BEGIN;  
INSERT INTO `rule_tree_node_line` VALUES (1, 2110081902, 1, 11, 1, 'man');  
INSERT INTO `rule_tree_node_line` VALUES (2, 2110081902, 1, 12, 1, 'woman');  
INSERT INTO `rule_tree_node_line` VALUES (3, 2110081902, 11, 111, 3, '25');  
INSERT INTO `rule_tree_node_line` VALUES (4, 2110081902, 11, 112, 4, '25');  
INSERT INTO `rule_tree_node_line` VALUES (5, 2110081902, 12, 121, 3, '25');  
INSERT INTO `rule_tree_node_line` VALUES (6, 2110081902, 12, 122, 4, '25');  
COMMIT;

实现

工程结构

Lottery
└── src
    └── main
       └── java
          └── cn.itedus.lottery.domain.rule
              ├── model
              │   ├── aggregates
              │   │   └── TreeRich.java
              │   ├── req
              │   │   └── DecisionMatterReq.java # 
              │   ├── res
              │   │   └── EngineResult.java
              │   └── vo
              │       ├── TreeNodeLineVO.java
              │       ├── TreeNodeVO.java
              │       └── TreeRootVO.java	
              └── service
                  ├── engine
                  │   ├── impl	
                  │   │   └── TreeEngineHandle.java
                  │   ├── EngineBase.java 
                  │   ├── EngineConfig.java
                  │   └── IEngine.java	
                  └── logic
                      ├── impl	
                      │   ├── UserAgeFilter.java
                      │   └── UserGenderFilter.java
                      ├── BaseLogic.java
                      └── LogicFilter.java

代码实现

封装规则过滤器

// 年龄规则
@Component
public class UserAgeFilter extends BaseLogic {
    @Override
    public String matterValue(DecisionMatterReq decisionMatter) {
        return decisionMatter.getValMap().get("age").toString();
    }
}

// 性别规则
@Component
public class UserGenderFilter extends BaseLogic {
    @Override
    public String matterValue(DecisionMatterReq decisionMatter) {
        return decisionMatter.getValMap().get("gender").toString();
    }
    
}

遍历规则路径获取果实节点

  • 这里主要提供决策树流程的处理过程,有点像通过链路的关系(性别、年龄)在二叉树中寻找果实节点的过程。
  • 同时提供一个抽象方法,执行决策流程的方法供外部去做具体的实现。
public class EngineBase extends EngineConfig implements EngineFilter {

    private Logger logger = LoggerFactory.getLogger(EngineBase.class);

    @Override
    public EngineResult process(DecisionMatterReq matter) {
        throw new RuntimeException("未实现规则引擎服务");
    }

    protected TreeNodeVO engineDecisionMaker(TreeRuleRich treeRuleRich, DecisionMatterReq matter) {
        TreeRootVO treeRoot = treeRuleRich.getTreeRoot();
        Map<Long, TreeNodeVO> treeNodeMap = treeRuleRich.getTreeNodeMap();

        // 规则树根ID
        Long rootNodeId = treeRoot.getTreeRootNodeId();
        TreeNodeVO treeNodeInfo = treeNodeMap.get(rootNodeId);

        // 节点类型[NodeType];1子叶、2果实
        while (Constants.NodeType.STEM.equals(treeNodeInfo.getNodeType())) {
            String ruleKey = treeNodeInfo.getRuleKey();
            LogicFilter logicFilter = logicFilterMap.get(ruleKey);
            String matterValue = logicFilter.matterValue(matter);
            Long nextNode = logicFilter.filter(matterValue, treeNodeInfo.getTreeNodeLineInfoList());
            treeNodeInfo = treeNodeMap.get(nextNode);
            logger.info("决策树引擎=>{} userId:{} treeId:{} treeNode:{} ruleKey:{} matterValue:{}", treeRoot.getTreeName(), matter.getUserId(), matter.getTreeId(), treeNodeInfo.getTreeNodeId(), ruleKey, matterValue);
        }

        return treeNodeInfo;
    }

}

遍历过程:

![[Pasted image 20241014173544.png]]

规则过滤器基础抽象类

  • 在抽象方法中实现了接口方法,同时定义了基本的决策方法;1、2、3、4、5等于、小于、大于、小于等于、大于等于的判断逻辑。
  • 同时定义了抽象方法,让每一个实现接口的类都必须按照规则提供决策值,这个决策值用于做逻辑比对。
public abstract class BaseLogic implements LogicFilter {
    @Override
    public Long filter(String matterValue, List<TreeNodeLineVO> treeNodeLineInfoList) {
        for (TreeNodeLineVO nodeLine : treeNodeLineInfoList) {
            if (decisionLogic(matterValue, nodeLine)) {
                return nodeLine.getNodeIdTo();
            }
        }
        return Constants.Global.TREE_NULL_NODE;
    }
    /**
     * 获取规则比对值
     * @param decisionMatter 规则比较判断值。在本例中为性别、年龄信息。
     * @return 比对值
     */
    @Override
    public abstract String matterValue(DecisionMatterReq decisionMatter);
    private boolean decisionLogic(String matterValue, TreeNodeLineVO nodeLine) {
        switch (nodeLine.getRuleLimitType()) {
            case Constants.RuleLimitType.EQUAL:
                return matterValue.equals(nodeLine.getRuleLimitValue());
            case Constants.RuleLimitType.GT:
                return Double.parseDouble(matterValue) > Double.parseDouble(nodeLine.getRuleLimitValue());
            case Constants.RuleLimitType.LT:
                return Double.parseDouble(matterValue) < Double.parseDouble(nodeLine.getRuleLimitValue());
            case Constants.RuleLimitType.GE:
                return Double.parseDouble(matterValue) >= Double.parseDouble(nodeLine.getRuleLimitValue());
            case Constants.RuleLimitType.LE:
                return Double.parseDouble(matterValue) <= Double.parseDouble(nodeLine.getRuleLimitValue());
            default:
                return false;
        }
    }
}

![[Pasted image 20241014171547.png]]

测试结果

@Resource
    private EngineFilter engineFilter;
    @Test
    public void test_process() {
        DecisionMatterReq req = new DecisionMatterReq();
        req.setTreeId(2110081902L);
        req.setUserId("lily");
        req.setValMap(new HashMap<String, Object>() );
        EngineResult res = engineFilter.process(req);
        logger.info("请求参数:{}", JSON.toJSONString(req));
        logger.info("测试结果:{}", JSON.toJSONString(res));
    }

输出

c.i.lottery.test.domain.ActivityTest     : 请求参数{"treeId":2110081902,"userId":"lily","valMap":{"gender":"man","age":"25"}}
09:30:59.355  INFO 53959 --- [           main] c.i.lottery.test.domain.ActivityTest     : 测试结果{"nodeId":112,"nodeValue":"100002","success":true,"treeId":2110081902,"userId":"lily"}

通过测试结果找到 "nodeValue":"100002" 这个 100002 就是用户 fustack 可以参与的活动号。

思考题

规则引擎的设计目的,根据什么来筛选?

  1. 目标精准性:某些活动中,组织者可能希望抽奖针对特定的人群,所以需要进行过滤一。
  2. 多活动管理:如果多个抽奖活动同时进行,不同的活动可能有不同的目标和人群。
  3. 动态规则更改:可以方便的更改筛选条件规则而不是嵌套使用if else,无需更改整个抽奖系统。

实现方式:引擎规则是一个决策树判断,定义一个过滤的接口,抽象类封装一个比对接口,具体类返回决策key对应的值,比如性别“女”,年龄23这样子,交给抽象类去比对。 可以根据制定的规则把节点的filter逻辑封装到map中得到树的信息,然后遍历,拿出每个节点的key比如gender,得到具体值进行比对,得到下一个node,再从map中进行取值依次遍历。 性别、年龄、消费情况、身份、搜索、点击等。

规则都是既定的,为什么还要用决策树?决策树和布尔检索有区别吗?

答:规则是既定的,决策树可以帮助我们组合这些规则,是的在确定场景下,可以根据不同的输入条件执行不同的路径,布尔检索主要是一种检索技术,如果是这样的话。

MQ解耦发奖流程

![[Pasted image 20241013185018.png]]

业务流程

代码层面:

  • 在数据库表 user_strategy_export 添加字段 mq_state 这个字段用于记录 MQ 发送状态。
  • 启动 kafka 新增 topic:lottery_invoice 用于发货单消息,当抽奖完成后则发送一个发货单,再异步处理发货流程,这个部分就是MQ的解耦流程使用。
  • ActivityProcessImpl#doDrawProcess 活动抽奖流程编排中补全用户抽奖后,发送MQ触达异步奖品发送的流程。

业务逻辑层面:

MQ用于解耦抽奖结果落库与发货系统,当获得抽奖结果之后,我们将获奖记录存入用户抽奖结果表中,接着发送MQ消息进行实际奖品发货的异步处理。

抽奖结果产生后,要进行奖品发货的流程,其中涉及到调用发货服务、库存扣减、更新库表发货成功记录、发货失败重试MQ消息重试(重试机制耗时)等一系列操作,使用MQ解耦消息可以使得立马获知抽奖结果而不被这一系列的后续操作所影响。

业务流程图 ![[Pasted image 20241013185018.png]]

接口幂等性(?

项目中的接口如何幂等实现? 答:程序中接口的幂等性,是基于数据防重字段实现的,比如UUID唯一索引,在插入时会抛异常,一般会加入查询、数据库缓存的方式减少对数据库的写操作。

消息幂等性保证

  1. 库表中记录MQ消息发送状态:消费者发送消息成功与否需要记录在库表state字段中。
  2. 定时任务确保消息发送:使用定时任务扫描发送失败任务,确保MQ消息发送成功。
  3. Kafka重试机制:MQ消息重试机制保证消费消息可靠性。

保证 MQ 消息重试的前提就是服务的幂等性,也就是每个抽奖结果只会对应一次发货流程。如果没有幂等性保障,在重试的过程中可能会造成流程异常,比如状态更新次数多了、数据库插入多了、给用户发奖多了等等,这种情况下会发生资损问题。

在发送 MQ 时可能会失败,不管成功率多高我们都需要考虑失败的情况。所以我们要更新库表状态记录MQ消费发送成功与否,如果发送失败则需要使用 定时任务worker 来补偿 MQ 发送。

所以MQ 发送完成到消费,也是可能有失败的,比如处理失败、更新库表失败等,但无论是什么失败都需要保证 MQ 进行重试处理。

回调函数

// 5. 发送MQ,触发发奖流程  
InvoiceVO invoiceVO = buildInvoiceVO(drawOrderVO);  
ListenableFuture<SendResult<String, Object>> future = kafkaProducer.sendLotteryInvoice(invoiceVO);  
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {  
  
    @Override  
    public void onSuccess(SendResult<String, Object> stringObjectSendResult) {  
        // 4.1 MQ 消息发送完成,更新数据库表 user_strategy_export.mq_state = 1        activityPartake.updateInvoiceMqState(invoiceVO.getuId(), invoiceVO.getOrderId(), Constants.MQState.COMPLETE.getCode());  
    }  
  
    @Override  
    public void onFailure(Throwable throwable) {  
        // 4.2 MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】  
        activityPartake.updateInvoiceMqState(invoiceVO.getuId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());  
    }  
  
});

最终一致性保证

消息队列选型

参考文章:

  1. 消息队列场景性能对比
  2. 架构对比
  • RocketMQ一般适用于消息可靠性要求较高的场景,例如金融消费系统
  • Kafka适用于吞吐量高,大数据处理和流式处理的场景。通过高效的分区和副本机制保证了高可用和数据一致性。是大数据处理场景下的优选消息队列

为什么主项目使用的是RocketMQ,抽奖模块使用kafka呢?

首先在这样的高并发场景选用Kafka是很合适的,(谈论二者架构上的优势和区别【详见RocketMQ和Kafka架构区别的文章】),我希望这个抽奖模块可以从主项目中脱离出来成为一个功能丰富且完善的独立服务,因为很多营销场景下都需要类似抽奖的功能,所以这个抽奖系统设计之初就是希望可以给多方营销系统提供直接封装好的抽奖服务,比如说在之前过年的时候支付宝提供的一个全民集福之后参与的抽奖瓜分奖金的活动,这样的场景下如果使用kafka作为消息队列就是很合适的,其次是由于之前学习过了RocketMQ,所以现在主要是为了学习而去接触和使用。

思考题

为什么使用卡夫卡,怎么配置卡夫卡,消息丢失怎么办,消费失败怎么办: 为什么:高吞吐量,适用于高性能生产和消费场景;持久性,可以保持大量数据;可以实时处理数据流。

首先要在pom文件里引入卡夫卡的依赖,yml中配置监听者消费者的属性,比如端口号、错误重试次数、序列化方式、以及监听的线程数等。启动zookeeper服务器和卡夫卡服务器。

怎么办:重试策略,设置重试次数和时间间隔;将不能发送的数据持久化到数据库中;当发送失败次数达到一定阈值时,启动报警机制。

怎么办:重试机制,如果多次无法消费,放入死信队列。报警机制。

xxl-job是干啥的?为什么选择xxl-job执行定时任务 XXL-JOB是一个分布式任务调度平台,定时扫描上线的任务进行调度,在指定时间内执行某些操作。使任务的监控和管理变得简单明了。

xxl-job在抽奖系统中主要可以扫描日志或数据库中是否有MQ消息发送失败,并且定时控制活动状态的变更。

具体怎么实现:首先应该要有日志记录,操作失败时会有日志记录下来。使用xxl-job设定一个定时任务,比如隔一段时间就扫描查找失败的任务,根据业务判断结果执行实际的补偿,可能是重新执行一个数据库操作、发送一个MQ消息等。补偿的结果也要记录。

XXL-Job的使用


@Component  
public class LotteryXxlJob {  
  
    private Logger logger = LoggerFactory.getLogger(LotteryXxlJob.class);  
  
    @Resource  
    private IActivityDeploy activityDeploy;  
  
    @Resource  
    private IActivityPartake activityPartake;  
  
    @Resource  
    private IStateHandler stateHandler;  
  
    @Resource  
    private IDBRouterStrategy dbRouter;  
  
  
    @Resource  
    private KafkaProducer kafkaProducer;  
  
    @XxlJob("lotteryActivityStateJobHandler")  
    public void lotteryActivityStateJobHandler() throws Exception {  
        logger.info("扫描活动状态 Begin");  
  
        List<ActivityVO> activityVOList = activityDeploy.scanToDoActivityList(0L);  
        if (activityVOList.isEmpty()) {  
            logger.info("扫描活动状态 End 暂无符合需要扫描的活动列表");  
            return;  
        }  
  
        while (!activityVOList.isEmpty()) {  
            for (ActivityVO activityVO : activityVOList) {  
                Integer state = activityVO.getState();  
                switch (state) {  
                    // 活动状态为审核通过,在临近活动开启时间前,审核活动为活动中。在使用活动的时候,需要依照活动状态核时间两个字段进行判断和使用。  
                    case 4:  
                        Result state4Result = stateHandler.doing(activityVO.getActivityId(), Constants.ActivityState.PASS);  
                        logger.info("扫描活动状态为活动中 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state4Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());  
                        break;  
                    // 扫描时间已过期的活动,从活动中状态变更为关闭状态【这里也可以细化为2个任务来处理,也可以把时间判断放到数据库中操作】  
                    case 5:  
                        if (activityVO.getEndDateTime().before(new Date())) {  
                            Result state5Result = stateHandler.close(activityVO.getActivityId(), Constants.ActivityState.DOING);  
                            logger.info("扫描活动状态为关闭 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state5Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());  
                        }  
                        break;  
                    default:  
                        break;  
                }  
            }  
  
            // 获取集合中最后一条记录,继续扫描后面10条记录  
            ActivityVO activityVO = activityVOList.get(activityVOList.size() - 1);  
            activityVOList = activityDeploy.scanToDoActivityList(activityVO.getId());  
        }  
  
        logger.info("扫描活动状态 End");  
  
    }  
  
    @XxlJob("lotteryOrderMQStateJobHandler")  
    public void lotteryOrderMQStateJobHandler() throws Exception {  
        // 验证参数  
        String jobParam = XxlJobHelper.getJobParam();  
        if (null == jobParam) {  
            logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 错误 params is null");  
            return;  
        }  
  
        // 获取分布式任务配置参数信息 参数配置格式:1,2,3 也可以是指定扫描一个,也可以配置多个库,按照部署的任务集群进行数量配置,均摊分别扫描效率更高  
        String[] params = jobParam.split(",");  
        logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 开始 params:{}", JSON.toJSONString(params));  
  
        if (params.length == 0) {  
            logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 结束 params is null");  
            return;  
        }  
  
        // 获取分库分表配置下的分表数  
        int tbCount = dbRouter.tbCount();  
  
        // 循环获取指定扫描库  
        for (String param : params) {  
            // 获取当前任务扫描的指定分库  
            int dbCount = Integer.parseInt(param);  
  
            // 判断配置指定扫描库数,是否存在  
            if (dbCount > dbRouter.dbCount()) {  
                logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 结束 dbCount not exist");  
                continue;  
            }  
  
            // 循环扫描对应表  
            for (int i = 0; i < tbCount; i++) {  
  
                // 扫描库表数据  
                List<InvoiceVO> invoiceVOList = activityPartake.scanInvoiceMqState(dbCount, i);  
                logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 扫描库:{} 扫描表:{} 扫描数:{}", dbCount, i, invoiceVOList.size());  
  
                // 补偿 MQ 消息  
                for (InvoiceVO invoiceVO : invoiceVOList) {  
  
                    ListenableFuture<SendResult<String, Object>> future = kafkaProducer.sendLotteryInvoice(invoiceVO);  
                    future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {  
  
                        @Override  
                        public void onSuccess(SendResult<String, Object> stringObjectSendResult) {  
                            // MQ 消息发送完成,更新数据库表 user_strategy_export.mq_state = 1                            activityPartake.updateInvoiceMqState(invoiceVO.getuId(), invoiceVO.getOrderId(), Constants.MQState.COMPLETE.getCode());  
                        }  
  
                        @Override  
                        public void onFailure(Throwable throwable) {  
                            // MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】  
                            activityPartake.updateInvoiceMqState(invoiceVO.getuId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());  
                        }  
  
                    });  
                }  
            }  
  
        }  
  
        logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 完成 param:{}", JSON.toJSONString(params));  
  
    }  
  
}

不超买设计

首先对于库存集中扣减类的业务流程,是不能直接用数据库表抗的。

比如数据库表有一条记录是库存,如果是通过锁这一条表记录更新库存为10、9、8的话,就会出现大量的用户在应用用获得数据库的连接后,等待前一个用户更新完库表记录后释放锁,让下一个用户进入在扣减。

这样随着用户参与量的增加,就会有非常多的用户处于等待状态,而等待的用户是持久数据库的连接的,这个连接资源非常宝贵,你占用了应用中其他的请求就进不来,最终导致一个请求要几分钟才能得到响应。【前台的用户越着急,越疯狂点击,直至越来越卡到崩溃】

所以,对于这样的秒杀场景,我们一般都是使用 redis 缓存来处理库存,它只要不超卖就可以。但也确保一点,不要用一条key加锁和等待释放的方式来处理,这样的效率依然是很低的。所以我们要尽可能的考虑分摊竞争,达到无锁化才是分布式架构设计的核心。

对接wx公众号

ref:

  1. 对接wx公众号

业务设计

搭建微信公众号网关服务 – 消息回复 ![[Pasted image 20241030172530.png]]

代码实现

该实现主要是以微信公众号对话为目标实现的,入口依然采用了小傅哥的组件设计,先根据消息类型来获取对应的处理树,再进行流程处理。

类说明:

  1. LogicFilter:消息过滤逻辑接口
    1. filter:回返回消息内容,可以写入调用的其他逻辑,再返回消息信息。
    2. getLogicLineList:返回当前逻辑接口连接线信息,返回空,则说明该节点已达消息逻辑的尽头,否则为连续型对话,会调用缓存记录当前对话节点信息。
  2. LogicLine:消息过滤逻辑组件的连接对象
    1. accessSign:通完下一个节点需要包含的对话内容,会通过用户内容与 此对象进行对比来选择是否可以继续通往消息逻辑的下一个节点。
    2. nextNode:下一个 LogicFilter
  3. EngineBase:消息逻辑引擎基础实现
    1. continueTalk:根据传入 openId,查询是否有记录的对话节点
    2. saveTalk:保存 <openId, LogicFilter>
    3. removeTalk:移除 openId
    4. router:沿用代码中 router 方法,我为不同的事件类型设立一种根节点的逻辑,第一步会试图找到逻辑根节点。
      • 当前根 LogicFilter:
        • textLogicFilter
        • eventLogicFilter

引擎实现方式:



@Override

public String process(BehaviorMatter request) {

// 获取到消息类型对应逻辑处理类

LogicFilter logicFilter = router(request);

// 如果当前节点没有后续节点, 说明单次消息返回内容

if (null == logicFilter.getLogicLineList()) {

return logicFilter.filter(request);

}

// 为连续型消息节点

String openId = request.getOpenId();

// 尝试获取上次触发过内容的节点

LogicFilter continuedLogicFilter = continueTalk(openId);

// 如果为刚开始对话

if (null == continuedLogicFilter) {

// 记录当前对话内容

saveTalk(openId, logicFilter);

return logicFilter.filter(request);

}

// 继续未完成的对话

// 对话在进行中, 尝试根据回复信息找到下一个对话消息逻辑节点

List<LogicLine> logicLines = continuedLogicFilter.getLogicLineList();

LogicFilter nextNode = null;

// 遍历查询 --》》 这段逻辑似乎还是应该交给 LogicFilter 实现更好!!

for (LogicLine logicLine : logicLines) {

if (request.getContent().contains(logicLine.getAccessSign())) {

nextNode = logicLine.getNextNode();

break;

}

}

if (nextNode == null) {

return "对话消息引擎执行错误, 请按照提示内容重新回复";

}

// 获取当前节点回复的消息

String backMsg = nextNode.filter(request);

// 如果当前回复节点已抵达对话逻辑末尾, 清除对话缓存信息

// 否则进行对话信息保存

if (null == nextNode.getLogicLineList()) {

removeTalk(openId);

} else {

saveTalk(openId, nextNode);

}

return backMsg;

}

  

当前引擎配置代码:

```java

public class EngineConfig {

/**

  • 存储用户回复信息处理节点 <openId, logicFilter>

*/

protected Map<String, LogicFilter> receiveCache = new HashMap<>();

/**

  • 存储不同消息类型LogicFilter根节点 <MsgType, LogicFilter>

*/ protected Map<String, LogicFilter> rootLogicFilterGroup = new HashMap<>();

@Resource private EventLogicFilter eventLogicFilter;

@Resource private RuleHandleLogicFilter ruleHandleLogicFilter;

@Resource private NormalLotteryLogicFilter normalLotteryLogicFilter;

/**

  • 组装消息引擎节点

*/ @PostConstruct public void init() {

// 消息回复类型节点

TextLogicFilter textLogicFilter = new TextLogicFilter();

ArrayList textLogicLines = new ArrayList<>();

LotteryLogicFilter lotteryLogicFilter = new LotteryLogicFilter();

OtherLogicFilter otherLogicFilter = new OtherLogicFilter();

textLogicLines.add(new LogicLine(“抽奖”, lotteryLogicFilter));

textLogicLines.add(new LogicLine(“”, otherLogicFilter));

textLogicFilter.setLogicLineList(textLogicLines);

// 配置抽奖回复规则树

ArrayList lotteryLogicLine = new ArrayList<>();

lotteryLogicLine.add(new LogicLine(“正常”, normalLotteryLogicFilter));

RuleLotteryLogicFilter ruleLotteryLogicFilter = new RuleLotteryLogicFilter();

lotteryLogicLine.add(new LogicLine(“规则”, ruleLotteryLogicFilter));

lotteryLogicFilter.setLogicLineList(lotteryLogicLine);

// 规则抽奖后续节点

ArrayList ruleLotteryLogicLine = new ArrayList<>();

ruleLotteryLogicLine.add(new LogicLine(“”, ruleHandleLogicFilter));

ruleLotteryLogicFilter.setLogicLineList(ruleLotteryLogicLine);

rootLogicFilterGroup.put(“text”, textLogicFilter);

rootLogicFilterGroup.put(“event”, eventLogicFilter);

} }