一、对TDengine最佳实践的肯定
TDengine在工业互联网的使用场景下,我认可的最佳实践也是如下:
业务模型 | TDengine模型 |
产品 | 超级表stable |
设备 | 子表table |
产品测点 | 超级表列 |
每创建产品时,就会创建一个超级表;
维护产品测点时,就是在维护超级表的表结构(列);
通过产品创建设备时,就是通过stable创建子表,会直接继承stable表结构,修改stable表结构,所有子表结构都会被修改!
在子表中的标签,可以维护一些固定的信息,如产品ID,位置,等;
但是在有些场景下,上面的模型就不一定适用,如:
-
业务系统的模型并不是:产品——设备模型;
-
业务系统中并没有测点管理,即没有办法提前知道有哪些点位信息;
这个时候,TDengine从v2.2.0.0版本开始,提供了无模式写入(Schemaless)功能;
二、什么是无模式写入(Schemaless)
1、什么是无模式写入(Schemaless)?
在物联网应用中,常会采集比较多的数据项,用于实现智能控制、业务分析、设备监控等。由于应用逻辑的版本升级,或者设备自身的硬件调整等原因,数据采集项就有可能比较频繁地出现变动。为了在这种情况下方便地完成数据记录工作,TDengine 提供调用 Schemaless 写入方式,可以免于预先创建超级表/子表的步骤,随着数据写入接口能够自动创建与数据对应的存储结构。并且在必要时,Schemaless 将自动增加必要的数据列,保证用户写入的数据可以被正确存储。
无模式写入方式建立的超级表及其对应的子表与通过 SQL 直接建立的超级表和子表完全没有区别,你也可以通过,SQL 语句直接向其中写入数据。需要注意的是,通过无模式写入方式建立的表,其表名是基于标签值按照固定的映射规则生成,所以无法明确地进行表意,缺乏可读性。
**注意:无模式写入会自动建表,不需要手动建表,手动建表的话可能会出现未知的错误。**
2、无模式写入(Schemless)当前支持的行协议有哪些?
TDengine 的无模式写入的行协议兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议、OpenTSDB 的 JSON 格式协议。但是使用这三种协议的时候,需要在 API 中指定输入内容使用解析协议的标准。
枚举值 |
说明 |
SML_LINE_PROTOCOL | InfluxDB 行协议(Line Protocol) |
SML_TELNET_PROTOCOL | OpenTSDB 文本行协议 |
SML_JSON_PROTOCOL | JSON 协议格式 |
无模式写入的示例代码如下,来源于官网:
public class SchemalessInsertTest { private static final String host = "127.0.0.1"; private static final String lineDemo = "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000"; private static final String telnetDemo = "stb0_0 1626006833 4 host=host0 interface=eth0"; private static final String jsonDemo = "{\"metric\": \"meter_current\",\"timestamp\": 1346846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"; public static void main(String[] args) throws SQLException { final String url = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata"; try (Connection connection = DriverManager.getConnection(url)) { init(connection); SchemalessWriter writer = new SchemalessWriter(connection); writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS); writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS); writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED); } } private static void init(Connection connection) throws SQLException { try (Statement stmt = connection.createStatement()) { stmt.executeUpdate("drop database if exists test_schemaless"); stmt.executeUpdate("create database if not exists test_schemaless"); stmt.executeUpdate("use test_schemaless"); } } }
官网的Schemaless模式固然好,但是美中不足的是,截止v3.0暂时不支持,JDBC-REST连接,这就很尴尬了!
三、另一种思路实现无模式写入Schemaless
1、自己实现Schemaless的思路:
-
1. 我们将超级表定义成一个单值模型,也即每条记录为:时间戳+采集值;
-
2. 在超级表的标签列中,要定义出设备ID、点位ID甚至点位物理量名称、点位分组等信息;
-
3. 这样同一设备不同点位的数据上报后,可以通过自动建表的语法向其对应子表中写入,在写入时指定tag值;
-
这种思想的核心点,由原来的“一个设备一张子表”变为了“一个点位一张子表”;
2、创建通用的超级表,并定义参数绑定的基础sql:
/** * 超级表:create stable iot_meters (ts timestamp, long_v bigint, dbl_v double, bool_v bool, str_v nchar(40)) tags(device_code binary(20), point_code binary(20)); * values中数值分别代表的意思: * 0:时间戳 * 1:long_v:long类型的值(存放int,tinyint,long类型的值) * 2:dbl_v:double类型的值(存放float,double类型的值) * 3:bool_v:boolean类型的值 * 4:str_v:字符串类型的值(如果要支持中文,就得用nchar,不可以用binary) * * tags中的数值代表的意思: * 0:device_code:设备的唯一标识 * 1:point_code:测点的标识 * 表名将使用device_code + “_” + point_code 拼接表示 */ private static String insertSql = "insert into ? using iot_meters tags(?,?) values(?,?,?,?,?)"; private static final Random random = new Random(System.currentTimeMillis());
3、编写serviceImpl的核心方法:
/** * 该方法将会创建10*10=100张表;(10个设备,每个设备10个测点) * 每张表中插入50行数据 * 所以共计会插入5000条数据; */ public void testSchemaless() { try( Connection connection = hikariCpHelper.getConnection(); TSDBPreparedStatement pstmt = connection.prepareStatement(insertSql).unwrap(TSDBPreparedStatement.class) ) { StopWatch stopWatch = new StopWatch("测试Schemaless"); stopWatch.start("开始测试"); for (int i = 1; i <= 10; i++) { for (int j = 1; j <= 10; j++) { pstmt.setTableName("device" + i + "_point" + j); pstmt.setTagString(0, "device"+i); pstmt.setTagString(1,"point"+j); ArrayList<Long> tsList = new ArrayList<>(); ArrayList<Long> longList = new ArrayList<>(); ArrayList<Double> dblList = new ArrayList<>(); ArrayList<Boolean> boolList = new ArrayList<>(); ArrayList<String> nStrList = new ArrayList<>(); for (int k = 1; k <= 30; k++) { tsList.add(System.currentTimeMillis() + k); longList.add(random.nextLong()); dblList.add(random.nextDouble()); boolList.add(random.nextBoolean()); nStrList.add(UUID.randomUUID().toString()); } pstmt.setTimestamp(0, tsList); pstmt.setLong(1, longList); pstmt.setDouble(2, dblList); pstmt.setBoolean(3, boolList); pstmt.setNString(4, nStrList, 30); pstmt.columnDataAddBatch(); } } pstmt.columnDataExecuteBatch(); stopWatch.stop(); System.out.println(stopWatch.isRunning()); System.out.println(stopWatch.prettyPrint()); System.out.println(stopWatch.shortSummary()); }catch (SQLException e) { e.printStackTrace(); }
4、我们执行该代码插入5000行数据:
false StopWatch '测试Schemaless': running time = 613202200 ns --------------------------------------------- ns % Task name --------------------------------------------- 613202200 100% 开始测试
去数据库中查询数据:
# 查询子表数量:100张 taos> select count(tbname) from iot_meters; count(tbname) | ======================== 100 | Query OK, 1 row(s) in set (0.004428s) #查询数据量:5000条 taos> select count(*) from iot_meters; count(*) | ======================== 5000 | Query OK, 1 row(s) in set (0.003566s)
5、一些常见的业务查询操作:
# 查看某一个设备下的所有测点:
taos> select count(point_code) from iot_meters where device_code = 'device1'; count(point_code) | ======================== 10 | Query OK, 1 row(s) in set (0.002558s) taos> select point_code from iot_meters where device_code = 'device1'; point_code | ======================= point1 | point2 | point3 | point4 | point5 | point6 | point7 | point8 | point9 | point10 | Query OK, 10 row(s) in set (0.005531s)
## 查询某个设备下,所有测点的最新值数据:
taos> select ts, last(dbl_v), device_code from iot_meters where device_code = 'device1' group by point_code; ts | last(dbl_v) | device_code | point_code | ==================================================================================================== 2022-11-22 15:00:19.742 | 0.444299298 | device1 | point1 | 2022-11-22 15:00:20.085 | 0.565672637 | device1 | point10 | 2022-11-22 15:00:20.061 | 0.853170612 | device1 | point2 | 2022-11-22 15:00:20.066 | 0.801004252 | device1 | point3 | 2022-11-22 15:00:20.068 | 0.179963580 | device1 | point4 | 2022-11-22 15:00:20.070 | 0.558506447 | device1 | point5 | 2022-11-22 15:00:20.074 | 0.954241448 | device1 | point6 | 2022-11-22 15:00:20.078 | 0.136518254 | device1 | point7 | 2022-11-22 15:00:20.079 | 0.178209132 | device1 | point8 | 2022-11-22 15:00:20.082 | 0.720409019 | device1 | point9 | Query OK, 10 row(s) in set (0.003931s)
### 降序查询某个设备的某个点位的一段时间内的历史数据:
taos> select ts, dbl_v, device_code, point_code from iot_meters where device_code = 'device1' and point_code = 'point2' and ts > now-1h order by ts desc limit 25; ts | dbl_v | device_code | point_code | ==================================================================================================== 2022-11-22 15:00:20.061 | 0.853170612 | device1 | point2 | 2022-11-22 15:00:20.060 | 0.360912442 | device1 | point2 | 2022-11-22 15:00:20.059 | 0.325616630 | device1 | point2 | 2022-11-22 15:00:20.058 | 0.703102396 | device1 | point2 | 2022-11-22 15:00:20.057 | 0.062190746 | device1 | point2 | 2022-11-22 15:00:20.056 | 0.240168596 | device1 | point2 | 2022-11-22 15:00:20.055 | 0.492822773 | device1 | point2 | 2022-11-22 15:00:20.054 | 0.747287895 | device1 | point2 | 2022-11-22 15:00:20.053 | 0.012568579 | device1 | point2 | 2022-11-22 15:00:20.052 | 0.885381570 | device1 | point2 | 2022-11-22 15:00:20.051 | 0.453957521 | device1 | point2 | 2022-11-22 15:00:20.050 | 0.137956986 | device1 | point2 | 2022-11-22 15:00:20.049 | 0.805163483 | device1 | point2 | 2022-11-22 15:00:20.048 | 0.876685443 | device1 | point2 | 2022-11-22 15:00:20.047 | 0.058523354 | device1 | point2 | 2022-11-22 15:00:20.046 | 0.544848255 | device1 | point2 | 2022-11-22 15:00:20.045 | 0.963466023 | device1 | point2 | 2022-11-22 15:00:20.044 | 0.710046874 | device1 | point2 | 2022-11-22 15:00:20.043 | 0.937116446 | device1 | point2 | 2022-11-22 15:00:20.042 | 0.225534412 | device1 | point2 | 2022-11-22 15:00:20.041 | 0.233205524 | device1 | point2 | 2022-11-22 15:00:20.040 | 0.182387527 | device1 | point2 | 2022-11-22 15:00:20.039 | 0.054217626 | device1 | point2 | 2022-11-22 15:00:20.038 | 0.452943190 | device1 | point2 | 2022-11-22 15:00:20.037 | 0.807384826 | device1 | point2 | Query OK, 25 row(s) in set (0.005978s)
#### 按照一定的维度对数据进行分组查询:
taos> select count(*) from iot_meters group by point_code; count(*) | point_code | =============================================== 500 | point1 | 500 | point10 | 500 | point2 | 500 | point3 | 500 | point4 | 500 | point5 | 500 | point6 | 500 | point7 | 500 | point8 | 500 | point9 | Query OK, 10 row(s) in set (0.007220s) taos> select count(*) from iot_meters group by device_code; count(*) | device_code | =============================================== 500 | device1 | 500 | device10 | 500 | device2 | 500 | device3 | 500 | device4 | 500 | device5 | 500 | device6 | 500 | device7 | 500 | device8 | 500 | device9 | Query OK, 10 row(s) in set (0.002853s)
四、常见问题解决
1、Unable to unwrap to class com.taosdata.jdbc.TSDBPreparedStatement
大概率是因为使用了不被支持的JDBC-RS,截止目前v3.0为止,JDBC-RS还不支持通过参数绑定的方式写入数据。
那么就只能通过JNI的方式写入数据了!
实在要用JDBC-RS,那么就只能通过拼接sql的方式进行写入了!
2、no taos in java.library.path
本地没有安装taos客户端;
如果已经安装,那么:
Windows 下可以将 C:\TDengine\driver\taos.dll 拷贝到 C:\Windows\System32\ 目录下;
Linux 下将建立如下软链 ln -s /usr/local/taos/driver/libtaos.so.x.x.x.x /usr/lib/libtaos.so 即可
3、JNI ERROR (2354): Client and server's time is not synchronized
客户端和服务端时间不一致;
可能是因为自己的虚拟机休眠了,之后又没有自动与时间服务器同步时间;
# 安装ntp和ntpdate [root@td3 ~]# yum -y install ntp ntpdate # 与cn.pool.ntp.org同步时间 [root@td3 ~]# ntpdate cn.pool.ntp.org 22 Nov 03:12:55 ntpdate[1031]: adjust time server 203.107.6.88 offset -0.000584 sec [root@td3 ~]# ntpstat synchronised to NTP server (119.28.183.184) at stratum 3 time correct to within 53 ms polling server every 64 s
4、Tdengine中显示的时间时区不对:
因为TDengine使用的是时间戳,所以不存在时区问题;
只需要修改所在机器的时区,这样显示就不会有问题了!
ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime