Cloud Alibaba Seata——分布式事务

一、分布式事务问题由来

1、单体应用被拆分成多个微服务应用,原来的三个模块被拆分成三个独立的应用,分别部署在三台服务器上,且使用自己独立的数据库;

某次的业务操作中,需要调用这三个服务来完成,此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性却没有办法由任意一台的本地事务来保证

一句话:一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题!

2、什么是Seata?

Seata是Alibaba开源的一款分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务——一站式的分布式事务解决方案!

3、Seata完成分布式事务的处理过程

分布式事务处理过程: 1个ID + 3个组件模型

  • 1个ID:全局唯一的事务ID;

  • 3个组件:

Transaction Coordinator——TC – 事务协调者,维护全局和分支事务的状态,驱动全局事务提交或回滚。——总把头

Transaction Manager——TM – 事务管理器,定义并控制全局事务的边界开始全局事务、并最终发起全局提交或全局回滚的决议

Resource Manager——RM – 资源管理器,管理控制分支事务处理的资源,负责分支注册、状态汇报、并接受事务协调器的指令,并驱动分支事务提交或回滚

92.jpg

处理过程:

1、TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID

2、XID在微服务调用链路的上下文中传播;

3、RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖;

4、TM向TC发起针对XID的全局提交或回滚决议

5、TC调度XID下管辖的全部分支事务完成提交或回滚操作;


二、Seata-Server的下载与安装

1、下载地址:  https://github.com/seata/seata/releases

本次选择的版本是V0.9.0

2、解压下载文件,修改conf目录下的 file.conf 配置文件(别忘记备份好习惯);

94.jpg

这一步主要是修改:自定义事务组名称 + 事务日志存储模式为db + 数据库连接信息;

95.jpg

mode = "db"

96.jpg

3、根据上面的配置,我们需要到数据库中创建一个数据库,名为seata

建表脚本使用解压目录 /conf/db_store.sql 中的脚本;此脚本会在 seata 库中创建三张表:分支表+全局表+锁表

97.jpg

4、继续在conf目录下,修改 registry.conf 配置文件:

98.jpg

5、在启动了Nacos的基础上,启动seata-server

99.jpg

到这一步,我们的Seata的本地安装就已经完成了!


三、示例实战——准备工作
 本次用例是一个用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  • 仓储服务:对给定的商品扣除仓储数量。

  • 订单服务:根据采购需求创建订单。

  • 帐户服务:从用户帐户中扣除余额。

93.jpg

当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,在通过远程调用账户服务来扣减用户账户里的余额,最后在订单服务中修改订单状态为已完成!

该操作跨越三个服务,三个数据库,必然会存在分布式事务问题!

1、创建业务数据库:

  • seata_order:存储订单的数据库

  • seata_storage:存储库存的数据库

  • seata_account:存储账户信息的数据库

CREATE DATABASE seata_order;
CREATE DATABASE seata_storage;
CREATE DATABASE seata_account;

2、在上面3个数据库中分别创建自己的业务表:脚本如下:

CREATE TABLE `t_order` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `count` int(11) DEFAULT NULL COMMENT '数量',
  `money` decimal(11,0) DEFAULT NULL COMMENT '金额',
  `status` int(1) DEFAULT NULL COMMENT '订单状态 0:创建中,1:已完结',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

CREATE TABLE `t_storage` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `total` int(11) DEFAULT NULL COMMENT '总库存',
  `used` int(11) DEFAULT NULL COMMENT '已用库存',
  `residue` int(11) DEFAULT NULL COMMENT '剩余库存',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

CREATE TABLE `t_account` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
  `total` decimal(10,0) DEFAULT NULL COMMENT '总额度',
  `used` decimal(10,0) DEFAULT NULL COMMENT '已用额度',
  `residue` decimal(10,0) DEFAULT NULL COMMENT '剩余额度',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

3、在上述的3个库中分别创建对应的回滚日志表

建表脚本是 /conf/db_undo_log.sql

-- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库)
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
drop table `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

4、最终数据库效果为:

100.jpg


四、示例实战——Order-Module配置搭建

首先弄清楚业务流程:

下订单 -> 减库存 -> 扣余额 -> 改(订单)状态

1、新建订单Order-Module

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2020</artifactId>
        <groupId>com.jiguiquan.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>seata-order-service2001</artifactId>

    <dependencies>
        <!--springcloud alibaba nacos discovery 以后服务中将nacos和sentinel一起配-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--使用seata一定要将自带的版本seata去掉,然后引入我们使用的版本的seata依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>0.9.0</version>
        </dependency>

        <!--openfeign-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

        <!--springboot项目web和actuator最好一起走-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <!--数据库操作4件套,jdbc/mysql/连接池druid/orm框架mybatis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
            <!--spring-boot-starter下面的所有版本都已经在父工程中的spring-boot-dependencies包含-->
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
        </dependency>

        <!--热部署devtools-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

2、application.xml 配置文件:

server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    alibaba:
      seata:
        tx-service-group: jgq_tx_group
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource  #当前数据源操作类型
    driver-class-name: org.gjt.mm.mysql.Driver
    url: jdbc:mysql://localhost:3306/cloud2020?useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: root
    password: 831121

feign:
  hystrix:
    enabled: false

logging:
  level:
    io:
      seata: info

mybatis:
  mapper-locations: classpath:mapper/*.xml

3、增加配置文件 file.conf,复制上面的seata配置即可:

101.jpg

4、增加配置文件registry.conf,和上面seata的配置一样:

102.jpg

5、创建domain域模型,叫entity也行,还有dao、mapper文件,项目结构如下:

103.jpg

6、使用Seata对数据源进行代理配置(重要)

Seata在AT模式下解决分布式事务的具体逻辑就是体现在对数据源的代理上面;

创建一个数据源代理配置类DataSourceProxyConfig:

/**
 * 使用seata对数据源进行代理
 * seata在AT模式下解决分布式事务的具体逻辑体现在对数据源的代理上,即对DataSource产生代理变成DataSourceProxy,进行全局事务的管理和协调,因此在整合时,需通过配置类的方式进行配置
 * 且DataSourceProxy必须是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务
 * @author jigq
 * @create 2020-04-29 11:09
 */
@Configuration
public class DataSourceProxyConfig {
    @Value("${mybatis.mapper-locations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Primary
    @Bean
    public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSourceProxy);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        bean.setTransactionFactory(new SpringManagedTransactionFactory());
        return bean.getObject();
    }
}

因为我们自己配置了数据源,为了防止循环注入,我们在启动类上需要将“数据源自动装载”排除在外——我们使用了Mybatis的Starter;

7、启动类:

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)  //取消数据源的自动创建
@EnableDiscoveryClient
@EnableFeignClients
public class SeataOrderMain2001 {
    public static void main(String[] args) {
        SpringApplication.run(SeataOrderMain2001.class, args);
    }
}

8、新建库存Storage-Module

9、新建账户Account-Module,这两部分基础结构与Order-Module一样,略;


五、实例实战——业务代码Order-Module

1、在Order-Module模块,创建service代码:

104.jpg

其中OrderService是本层的业务代码,而StorageService和AccountService是OpenFeign的远程调用接口类;

2、开始编写创建订单的业务逻辑代码:

OrderServiceImpl.java:

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    @Resource
    private OrderMapper orderMapper;
    @Resource
    private StorageService storageService;
    @Resource
    private AccountService accountService;

    @Override
    public Order create(Order order) {
        log.info("------>开始新建订单");
        order.setStatus(0);
        orderMapper.insert(order);
        System.out.println("此处已获得主键为:"+order.getId());

        log.info("------>订单微服务开始调用库存,做扣减");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("------>订单微服务开始调用库存,做扣减End");

        log.info("------>订单微服务开始调用账户,做扣减");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("------>订单微服务开始调用账户,做扣减End");

        //修改原订单状态,从0到1
        log.info("------>修改订单状态开始");
        order.setStatus(1);
        orderMapper.updateByPrimaryKey(order);
        log.info("------>修改订单状态结束");
        return order;
    }
}

StorageService.java:

@FeignClient(value = "seata-storage-service")
public interface StorageService {
    @PostMapping(value = "/storage/decrease")
    CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}

AccountService.java:

@FeignClient(value = "seata-account-service")
public interface AccountService {
    @PostMapping(value = "/account/decrease")
    CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}

3、编写Controller层OrderController.java:

@RestController
public class OrderController {
    @Autowired
    private OrderService orderService;

    @PostMapping("/order/create")
    public CommonResult<Order> create(@RequestBody Order order){
        Order result = orderService.create(order);
        return new CommonResult<>(200, "创建订单成功", result);
    }
}

4、Order-Module创建成功后,我们可以尝试启动Order2001服务,一切顺利!


六、实例实战——业务代码Storage-Module

StorageController.java

@RestController
public class StorageController {
    @Autowired
    private StorageService storageService;

    public CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count){
        storageService.decrease(productId, count);
        return new CommonResult(200, "扣减库存成功");
    }
}

StorageServiceImpl.java:

@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
    @Resource
    private StorageMapper storageMapper;

    @Override
    public void decrease(Long productId, Integer count) {
      log.info("------>storage-service中扣减库存开始");
      storageMapper.decrease(productId, count);
      log.info("------>storage-service中扣减库存结束");
    }
}

StorgaeMapper.java:

StorageMapper.xml:

void decrease(@Param("productId") Long productId, @Param("count") Integer count);

<update id="decrease">
  update t_storage
  set used = used + #{count,jdbcType=INTEGER}, residue = residue - #{count,jdbcType=INTEGER}
  where product_id = #{productId,jdbcType=BIGINT}
</update>


七、实例实战——业务代码Account-Module

AccountController.java:

@RestController
public class AccountController {
    @Autowired
    private AccountService accountService;

    @PostMapping(value = "/account/decrease")
    CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){
        accountService.decrease(userId, money);
        return new CommonResult(200, "扣减账户余额成功");
    }
}

AccountServiceImpl:

@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    @Resource
    private AccountMapper accountMapper;

    @Override
    public void decrease(Long userId, BigDecimal money) {
        log.info("------>account-service中扣除账户余额开始");
        accountMapper.decrease(userId, money);
        log.info("------>account-service中扣除账户余额结束");
    }
}

AccountMapper.java:

AccountMapper.xml:

void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
<update id="decrease">
  update t_account
  set used = used + #{money,jdbcType=DECIMAL}, residue = residue - #{money,jdbcType=DECIMAL}
  where user_id = #{userID,jdbcType=BIGINT}
</update>

八、启动测试验证效果

1、分别启动3个服务,顺利启动,测试前数据库截图如下:

105.jpg

2、首先测试一组正常下单情况:

POST  
{
    "userId":1,
    "productId":1,
    "count":10,
    "money":100
}

请求结果:

107.jpg

此时我们看看三张数据库表的数据结果:

106.jpg

很顺利,没有问题!

3、模拟异常,在account服务中模拟超时异常,此时我们还没用到seata的@GlobalTransactional注解:

注意Feign默认的请求超时时间为1秒钟:

@RestController
public class AccountController {
    @Autowired
    private AccountService accountService;

    @PostMapping(value = "/account/decrease")
    CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){
        try {
            TimeUnit.SECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        accountService.decrease(userId, money);
        return new CommonResult(200, "扣减账户余额成功");
    }
}

之后,我们重新访问刚刚的请求,结果是Feign访问超时:

Read timed out executing POST http://seata-account-service/account/decrease?userId=1&money=100

检查一下数据库,看看结果如何:

108.jpg

库存减了,钱被扣了,但是订单没有正常完成,还是未完成状态,这显然是由问题的!

更可怕的是,由于Feign有超时重试机制,有时候,账户余额甚至被多次扣款,这就更严重了!!

这就是分布式事务问题的表现!!!

4、还是上面的超时异常,但是本次我们使用Seata的@GlobalTransactional注解:

我们在OrderServiceImpl.java的方法体上,加上@GlobalTransactional注解:

//发现的所有Exception,通通回滚
@Override
@GlobalTransactional(name = "jgq-create-order", rollbackFor = Exception.class)
public Order create(Order order) {
    log.info("------>开始新建订单");
    order.setStatus(0);
    orderMapper.insert(order);
    System.out.println("此处已获得主键为:"+order.getId());

    log.info("------>订单微服务开始调用库存,做扣减");
    storageService.decrease(order.getProductId(), order.getCount());
    log.info("------>订单微服务开始调用库存,做扣减End");

    log.info("------>订单微服务开始调用账户,做扣减");
    accountService.decrease(order.getUserId(), order.getMoney());
    log.info("------>订单微服务开始调用账户,做扣减End");

    //修改原订单状态,从0到1
    log.info("------>修改订单状态开始");
    order.setStatus(1);
    orderMapper.updateByPrimaryKey(order);
    log.info("------>修改订单状态结束");
    return order;
}

此时我有访问了一次刚刚的链接,依然是超时异常,我还故意躲操作了两次!然后我们再看看数据库:

109.jpg

本应该像上面异常一样的错误数据(半成品)没有被写进数据库,严重的分布式事务被避免了,仅仅就是一个注解@GlobalTransactional;

到这里,我们使用Seata解决分布式事务的测试校验工作,就算是成功结束了!

个人此项目代码地址(持续更新):

https://github.com/jiguiquan/cloud2020

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐