关于TDengine-Schemaless的思考与使用

一、对TDengine最佳实践的肯定

TDengine在工业互联网的使用场景下,我认可的最佳实践也是如下:

业务模型 TDengine模型
产品 超级表stable
设备 子表table
产品测点 超级表列
  • 每创建产品时,就会创建一个超级表;

  • 维护产品测点时,就是在维护超级表的表结构(列);

  • 通过产品创建设备时,就是通过stable创建子表,会直接继承stable表结构,修改stable表结构,所有子表结构都会被修改!

  • 在子表中的标签,可以维护一些固定的信息,如产品ID,位置,等;

但是在有些场景下,上面的模型就不一定适用,如:

  • 业务系统的模型并不是:产品——设备模型;

  • 业务系统中并没有测点管理,即没有办法提前知道有哪些点位信息;

这个时候,TDengine从v2.2.0.0版本开始,提供了无模式写入(Schemaless)功能;


二、什么是无模式写入(Schemaless)

无模式写入:https://docs.taosdata.com/reference/schemaless/

1、什么是无模式写入(Schemaless)?

        在物联网应用中,常会采集比较多的数据项,用于实现智能控制、业务分析、设备监控等。由于应用逻辑的版本升级,或者设备自身的硬件调整等原因,数据采集项就有可能比较频繁地出现变动。为了在这种情况下方便地完成数据记录工作,TDengine 提供调用 Schemaless 写入方式,可以免于预先创建超级表/子表的步骤,随着数据写入接口能够自动创建与数据对应的存储结构。并且在必要时,Schemaless 将自动增加必要的数据列,保证用户写入的数据可以被正确存储。

        无模式写入方式建立的超级表及其对应的子表与通过 SQL 直接建立的超级表和子表完全没有区别,你也可以通过,SQL 语句直接向其中写入数据。需要注意的是,通过无模式写入方式建立的表,其表名是基于标签值按照固定的映射规则生成,所以无法明确地进行表意,缺乏可读性。

**注意:无模式写入会自动建表,不需要手动建表,手动建表的话可能会出现未知的错误。**

2、无模式写入(Schemless)当前支持的行协议有哪些?

        TDengine 的无模式写入的行协议兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议、OpenTSDB 的 JSON 格式协议。但是使用这三种协议的时候,需要在 API 中指定输入内容使用解析协议的标准。

枚举值
说明
SML_LINE_PROTOCOL InfluxDB 行协议(Line Protocol)
SML_TELNET_PROTOCOL OpenTSDB 文本行协议
SML_JSON_PROTOCOL JSON 协议格式

无模式写入的示例代码如下,来源于官网:

public class SchemalessInsertTest {
    private static final String host = "127.0.0.1";
    private static final String lineDemo = "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000";
    private static final String telnetDemo = "stb0_0 1626006833 4 host=host0 interface=eth0";
    private static final String jsonDemo = "{\"metric\": \"meter_current\",\"timestamp\": 1346846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";

    public static void main(String[] args) throws SQLException {
        final String url = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
        try (Connection connection = DriverManager.getConnection(url)) {
            init(connection);

            SchemalessWriter writer = new SchemalessWriter(connection);
            writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
            writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
            writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
        }
    }

    private static void init(Connection connection) throws SQLException {
        try (Statement stmt = connection.createStatement()) {
            stmt.executeUpdate("drop database if exists test_schemaless");
            stmt.executeUpdate("create database if not exists test_schemaless");
            stmt.executeUpdate("use test_schemaless");
        }
    }
}

官网的Schemaless模式固然好,但是美中不足的是,截止v3.0暂时不支持,JDBC-REST连接,这就很尴尬了!


三、另一种思路实现无模式写入Schemaless

1、自己实现Schemaless的思路:

  • 1. 我们将超级表定义成一个单值模型,也即每条记录为:时间戳+采集值;

  • 2. 在超级表的标签列中,要定义出设备ID、点位ID甚至点位物理量名称、点位分组等信息;

  • 3. 这样同一设备不同点位的数据上报后,可以通过自动建表的语法向其对应子表中写入,在写入时指定tag值;

  • 这种思想的核心点,由原来的“一个设备一张子表”变为了“一个点位一张子表”

2、创建通用的超级表,并定义参数绑定的基础sql:

/**
 * 超级表:create stable iot_meters (ts timestamp, long_v bigint, dbl_v double, bool_v bool, str_v nchar(40)) tags(device_code binary(20), point_code binary(20));
 * values中数值分别代表的意思:
 * 0:时间戳
 * 1:long_v:long类型的值(存放int,tinyint,long类型的值)
 * 2:dbl_v:double类型的值(存放float,double类型的值)
 * 3:bool_v:boolean类型的值
 * 4:str_v:字符串类型的值(如果要支持中文,就得用nchar,不可以用binary)
 *
 * tags中的数值代表的意思:
 * 0:device_code:设备的唯一标识
 * 1:point_code:测点的标识
 * 表名将使用device_code + “_” + point_code 拼接表示
 */
private static String insertSql = "insert into ? using iot_meters tags(?,?) values(?,?,?,?,?)";

private static final Random random = new Random(System.currentTimeMillis());

3、编写serviceImpl的核心方法:

/**
 * 该方法将会创建10*10=100张表;(10个设备,每个设备10个测点)
 * 每张表中插入50行数据
 * 所以共计会插入5000条数据;
 */
public void testSchemaless() {
    try(
        Connection connection = hikariCpHelper.getConnection();
        TSDBPreparedStatement pstmt = connection.prepareStatement(insertSql).unwrap(TSDBPreparedStatement.class)
    ) {
        StopWatch stopWatch = new StopWatch("测试Schemaless");
        stopWatch.start("开始测试");
        for (int i = 1; i <= 10; i++) {

            for (int j = 1; j <= 10; j++) {
                pstmt.setTableName("device" + i + "_point" + j);
                pstmt.setTagString(0, "device"+i);
                pstmt.setTagString(1,"point"+j);

                ArrayList<Long> tsList = new ArrayList<>();
                ArrayList<Long> longList = new ArrayList<>();
                ArrayList<Double> dblList = new ArrayList<>();
                ArrayList<Boolean> boolList = new ArrayList<>();
                ArrayList<String> nStrList = new ArrayList<>();

                for (int k = 1; k <= 30; k++) {
                    tsList.add(System.currentTimeMillis() + k);
                    longList.add(random.nextLong());
                    dblList.add(random.nextDouble());
                    boolList.add(random.nextBoolean());
                    nStrList.add(UUID.randomUUID().toString());
                }

                pstmt.setTimestamp(0, tsList);
                pstmt.setLong(1, longList);
                pstmt.setDouble(2, dblList);
                pstmt.setBoolean(3, boolList);
                pstmt.setNString(4, nStrList, 30);

                pstmt.columnDataAddBatch();
            }
        }
        pstmt.columnDataExecuteBatch();
        stopWatch.stop();
        System.out.println(stopWatch.isRunning());
        System.out.println(stopWatch.prettyPrint());
        System.out.println(stopWatch.shortSummary());
    }catch (SQLException e) {
        e.printStackTrace();
    }

4、我们执行该代码插入5000行数据:

false
StopWatch '测试Schemaless': running time = 613202200 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
613202200  100%  开始测试

去数据库中查询数据:

# 查询子表数量:100张
taos> select count(tbname) from iot_meters;
     count(tbname)     |
========================
                   100 |
Query OK, 1 row(s) in set (0.004428s)

#查询数据量:5000条
taos> select count(*) from iot_meters;
       count(*)        |
========================
                  5000 |
Query OK, 1 row(s) in set (0.003566s)

5、一些常见的业务查询操作:

# 查看某一个设备下的所有测点:

taos> select count(point_code) from iot_meters where device_code = 'device1';
   count(point_code)   |
========================
                    10 |
Query OK, 1 row(s) in set (0.002558s)

taos> select point_code from iot_meters where device_code = 'device1';
      point_code      |
=======================
 point1               |
 point2               |
 point3               |
 point4               |
 point5               |
 point6               |
 point7               |
 point8               |
 point9               |
 point10              |
Query OK, 10 row(s) in set (0.005531s)

## 查询某个设备下,所有测点的最新值数据:

taos> select ts, last(dbl_v), device_code from iot_meters where device_code = 'device1' group by point_code;
           ts            |        last(dbl_v)        |     device_code      |      point_code      |
====================================================================================================
 2022-11-22 15:00:19.742 |               0.444299298 | device1              | point1               |
 2022-11-22 15:00:20.085 |               0.565672637 | device1              | point10              |
 2022-11-22 15:00:20.061 |               0.853170612 | device1              | point2               |
 2022-11-22 15:00:20.066 |               0.801004252 | device1              | point3               |
 2022-11-22 15:00:20.068 |               0.179963580 | device1              | point4               |
 2022-11-22 15:00:20.070 |               0.558506447 | device1              | point5               |
 2022-11-22 15:00:20.074 |               0.954241448 | device1              | point6               |
 2022-11-22 15:00:20.078 |               0.136518254 | device1              | point7               |
 2022-11-22 15:00:20.079 |               0.178209132 | device1              | point8               |
 2022-11-22 15:00:20.082 |               0.720409019 | device1              | point9               |
Query OK, 10 row(s) in set (0.003931s)

### 降序查询某个设备的某个点位的一段时间内的历史数据:

taos> select ts, dbl_v, device_code, point_code from iot_meters where device_code = 'device1' and point_code = 'point2' and ts > now-1h order by ts desc limit 25;
           ts            |           dbl_v           |     device_code      |      point_code      |
====================================================================================================
 2022-11-22 15:00:20.061 |               0.853170612 | device1              | point2               |
 2022-11-22 15:00:20.060 |               0.360912442 | device1              | point2               |
 2022-11-22 15:00:20.059 |               0.325616630 | device1              | point2               |
 2022-11-22 15:00:20.058 |               0.703102396 | device1              | point2               |
 2022-11-22 15:00:20.057 |               0.062190746 | device1              | point2               |
 2022-11-22 15:00:20.056 |               0.240168596 | device1              | point2               |
 2022-11-22 15:00:20.055 |               0.492822773 | device1              | point2               |
 2022-11-22 15:00:20.054 |               0.747287895 | device1              | point2               |
 2022-11-22 15:00:20.053 |               0.012568579 | device1              | point2               |
 2022-11-22 15:00:20.052 |               0.885381570 | device1              | point2               |
 2022-11-22 15:00:20.051 |               0.453957521 | device1              | point2               |
 2022-11-22 15:00:20.050 |               0.137956986 | device1              | point2               |
 2022-11-22 15:00:20.049 |               0.805163483 | device1              | point2               |
 2022-11-22 15:00:20.048 |               0.876685443 | device1              | point2               |
 2022-11-22 15:00:20.047 |               0.058523354 | device1              | point2               |
 2022-11-22 15:00:20.046 |               0.544848255 | device1              | point2               |
 2022-11-22 15:00:20.045 |               0.963466023 | device1              | point2               |
 2022-11-22 15:00:20.044 |               0.710046874 | device1              | point2               |
 2022-11-22 15:00:20.043 |               0.937116446 | device1              | point2               |
 2022-11-22 15:00:20.042 |               0.225534412 | device1              | point2               |
 2022-11-22 15:00:20.041 |               0.233205524 | device1              | point2               |
 2022-11-22 15:00:20.040 |               0.182387527 | device1              | point2               |
 2022-11-22 15:00:20.039 |               0.054217626 | device1              | point2               |
 2022-11-22 15:00:20.038 |               0.452943190 | device1              | point2               |
 2022-11-22 15:00:20.037 |               0.807384826 | device1              | point2               |
Query OK, 25 row(s) in set (0.005978s)

#### 按照一定的维度对数据进行分组查询:

taos> select count(*) from iot_meters group by point_code;
       count(*)        |      point_code      |
===============================================
                   500 | point1               |
                   500 | point10              |
                   500 | point2               |
                   500 | point3               |
                   500 | point4               |
                   500 | point5               |
                   500 | point6               |
                   500 | point7               |
                   500 | point8               |
                   500 | point9               |
Query OK, 10 row(s) in set (0.007220s)

taos> select count(*) from iot_meters group by device_code;
       count(*)        |     device_code      |
===============================================
                   500 | device1              |
                   500 | device10             |
                   500 | device2              |
                   500 | device3              |
                   500 | device4              |
                   500 | device5              |
                   500 | device6              |
                   500 | device7              |
                   500 | device8              |
                   500 | device9              |
Query OK, 10 row(s) in set (0.002853s)


四、常见问题解决

1、Unable to unwrap to class com.taosdata.jdbc.TSDBPreparedStatement

大概率是因为使用了不被支持的JDBC-RS,截止目前v3.0为止,JDBC-RS还不支持通过参数绑定的方式写入数据。

那么就只能通过JNI的方式写入数据了!

实在要用JDBC-RS,那么就只能通过拼接sql的方式进行写入了!

2、no taos in java.library.path

本地没有安装taos客户端;

如果已经安装,那么:

  • Windows 下可以将 C:\TDengine\driver\taos.dll 拷贝到 C:\Windows\System32\ 目录下;

  • Linux 下将建立如下软链 ln -s /usr/local/taos/driver/libtaos.so.x.x.x.x /usr/lib/libtaos.so 即可

3、JNI ERROR (2354): Client and server's time is not synchronized

客户端和服务端时间不一致;

可能是因为自己的虚拟机休眠了,之后又没有自动与时间服务器同步时间;

# 安装ntp和ntpdate
[root@td3 ~]# yum -y install ntp ntpdate

# 与cn.pool.ntp.org同步时间
[root@td3 ~]# ntpdate cn.pool.ntp.org
22 Nov 03:12:55 ntpdate[1031]: adjust time server 203.107.6.88 offset -0.000584 sec

[root@td3 ~]# ntpstat
synchronised to NTP server (119.28.183.184) at stratum 3
   time correct to within 53 ms
   polling server every 64 s

4、Tdengine中显示的时间时区不对:

因为TDengine使用的是时间戳,所以不存在时区问题;

只需要修改所在机器的时区,这样显示就不会有问题了!

ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐