数据目的地
更新时间: 2023-04-19

数据目的地是规则接收到设备消息,并通过规则路由转发到的最终数据操作,目的地可以是存储、计算、消息队列等云产品,目前规则引擎支持如下几种数据目的地操作。

  • 发送到另一个设备,可以实现设备联动控制以及业务服务器端接收设备消息的场景

  • 时序数据库TSDB,将设备消息存储写入到TSDB中,业务应用可以查询TSDB获取设备的历史数据

  • 用户KAFKA,将设备消息转发到用户Kafka中,业务应用可以实时获取设备消息进行流式计算、实时统计等

  • 定制化服务,私有化部署情况下支持根据需求定制化插件将消息发往客户指定的目的地服务

# 发布到另一个设备

您可以设置将订阅接收到的数据,转发到另一个设备,实现M2M控制场景,将服务视为一台设备时亦可以将设备消息转发给业务服务。

规则引擎支持将消息转发给当前实例下任意指定设备。

# 数据转发流程

假设目的地是另外一个设备B,该规则是当设备A更新其温度属性时触发向设备B下发一条控制指令。

re-09m2m.png

此方式优势是可以方便、快速的将消息在设备与设备之间、设备与服务之间、服务与服务之间流转(实际使用中将服务虚拟化为设备),以实现设备场景联动、远程下发指令、实时订阅设备状态数据等业务应用需求。

# 操作流程

例如规则引擎订阅接收到设备A上报的属性消息(假设对应的G+link语义为thing/product01/deviceA/property/post):

{
    "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
    "method":"thing.property.post",
    "version":"1.0",
    "timestamp":1610430718000,
    "properties":{
        "temperature": 36.2
    }
}

目的地是名称为选中设备B的 设备可写属性更新接口(假设对应的G+link语义为thing/product02/deviceB/property/invoke)

通过查询语句将消息转换为G+link中服务设备控制格式的输出:

{
    "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
    "method":"thing.property.invoke",
    "version":"1.0",
    "timestamp":1610430718000,
    "properties":{
        "temperature":36.2
    }
}

则设备B可以接收到如上格式的消息。

备注:以上转换对应的查询语句为

    {
        "reqId":$.reqId,
        "method":"thing.property.invoke",
        "version":"1.0",
        "timestamp":$.timestamp,
        "properties":{
            "temperature":$.properties.temperature
        }
    }

# 写入用户Kafka

可以使用规则引擎将设备数据转发到Kafka主题中,服务端再从消息服务主题中消费消息,实现设备端与服务端之间高性能的消息闭环传输。

# 添加Kafka到数据目的地列表

使用其他账号下Kafka时,需要先将对应的Kafka添加到数据目的地列表中,之后在配置数据目的地时选中。

详细添加步骤见《数据目的地管理》章节。

# 数据转发流程

设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到消息服务的主题中。 然后,您的应用服务器调用消息服务的接口消费消息。

re-10m2kafka.png

此方式使用消息队列保证消息的可靠性,避免了服务端不可用时导致消息丢失。同时,消息服务在处理大量消息并发时,有削峰填谷的作用,保证服务端不会因为突然的并发压力导致服务不可用。物联网平台与消息服务的结合,可以实现设备端与服务端之间高性能的消息闭环传输。

# 操作流程

例如规则引擎订阅接收到设备A上报的属性消息(假设对应的G+link语义为thing/product01/deviceA/property/post):

{
    "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
    "method":"thing.property.post",
    "version":"1.0",
    "timestamp":1610430718000,
    "properties":{
        "temperature": 36.2
    }
}

假设目的地是用户的Kafka主题device2service

应用服务器通过消费device2service主题可实时处理设备数据,当服务器故障时,kafka中未消费的数据可以在服务恢复后继续处理。

# 存储到时序数据库TSDB

您可以配置规则,将数据转发到时序数据库TSDB的实例中存储。

# 数据转发流程

设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到时序数据库TSDB。 然后,您的应用服务器调用TSDB的查询接口,获取设备的历史数据。

时序数据库是最适合存储设备属性、状态类时序数据的数据库,设备数据持久化存储到TSDB,业务应用从数据库中查询数据完成业务操作。

re-11m2tsdb.png

# 操作流程

写入时序数据的数据格式必须符合时序数据库写入接口的要求,即设备上报的原始消息经过查询语句的转换之后必须符合如下示例格式,才能正常写入,否则不能被成功写入TSDB。具体的转换语句可参考《常用查询语句示例》章节,G+link协议各接口数据可以已标准格式写入TSDB,用户也可以参考标准写入的语句自行设置写入格式。

写入TSDB数据格式示例

{
    "datapoints": [{
        "metric": "cpu_idle",
        "tags": {
            "host": "server1",
            "rack": "rack1"
        },
        "timestamp": 1465376157007,
        "value": 51
    }, {
        "metric": "cpu_idle",
        "tags": {
            "host": "server2",
            "rack": "rack2"
        },
        "values": [
            [1465376269769, 67],
            [1465376325057, 60]
        ]
    }]
}

数据格式说明

参数名称 参数类型 是否必须 说明
datapoints List<Datapoint> 必须 datapoint列表,由Datapoint对象组成的数组

Datapoint对象

参数名称 参数类型 是否必须 说明
metric String 必须 metric的名称
field String 可选 field的名称,默认名称为value。不同的field支持不同的数据类型写入。对于同一个field,如果写入了某个数据类型的value之后,相同的field不允许写入其他数据类型
tags Object 必须 data point对应的所有tag,Object中的一对key-value表示一个tag的key-value
type String 可选 目前支持Long/Double/String/Bytes。代表value字段的类型,如果不填会根据解析出来的类型为准。bytes是种特殊类型,表示value是经过base64编码后的String,TSDB存储时会反编码成byte数组存储
timestamp Int 可选 Unix时间戳,单位是毫秒;如果timestamp为空,value不为空,timestamp自动填入系统当前时间;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000;timestamp+value与values两者必须二选一
value Int/Double/String 可选 data point的值,timestamp+value与values两者必须二选一。当写入的metric、field、tags、timestamp都相同时,后写入的value会覆盖先写入的value
values List<List<Any>> 可选 对于相同的metric+tags的data point,可以通过合并成一个values的List来减少payload,values是个二维数组,里面的一维必须是两个元素,第一个元素是timestamp,是unix时间戳,类型是Int,第二个元素是value,类型是Int/Double/String;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000

# 定制化服务

私有化部署情况下,平台支持以二次开发插件的方式将消息通过规则引擎发布到客户已有的计算、存储服务以及指定的业务接口。