网关如何接入物联网平台
更新时间: 2023-04-13

# 接入流程

  1. 创建设备获得设备的接入鉴权信息,包含productKey、deviceName、deviceSecret、endpoint、instanceId五项信息;
  2. 设备向物联网平台鉴权中心请求设备接入资源,鉴权通过后返回设备的真实接入MQTT连接
  3. 设备使用获取的接入资源配置信息连接平台
  4. 连接平台使用平台定义的G+link协议发送和接收消息

# 设备接入

使用获得的设备鉴权信息(productKey、deviceName、deviceSecret、endpoint、instanceId)请求获取资源连接信息接口

  • 基本信息

Endpoint: 平台实例详情页查看 连接地址

Path: /v1/devices/:instanceId/:productkey/:deviceName/resources

Method: POST

接口描述:

  • 请求参数

Headers

参数名称 参数值 是否必须 示例 备注
Content-Type application/json
signature
expiryTime 分钟级时间戳,当前时间前后十分钟有效

路径参数

参数名称 示例 备注
instanceId
productkey
deviceName

Body

名称类型是否必须默认值备注其他信息
resourceTypestring必须MQTT/EVS

  • 返回数据
名称类型是否必须默认值备注其他信息
resourceTypestring非必须
contentobject非必须

使用返回的MQTT资源配置信息连接平台

# 基于G+link交互协议进行数据交互

G+link交互协议是平台定义的一套设备与平台交互规范,G+link与物模型搭配使用。

首先在产品的功能定义中配置物模型,描述设备具有的属性、事件、服务,之后根据物模型中定义的功能项组装G+link协议中的消息payload实现设备与平台的双向交互。

平台将设备划分为直连设备、网关、子设备三种接入类型,所有设备均具有一套独立接口,设备接口不依赖与设备连接,即子设备同样具有自己的一套接口。

# 网关与子设备消息上下行

# 设备的定义

在平台中网关与网关下连接的子设备均被视为设备,需要在平台中分别创建网关和子设备产品。

设备采用网关方式接入时,需要在平台创建与实际设备对应的网关和设备。

# 网关设备与平台交互

网关自身作为一台设备,使用从平台获得的设备鉴权信息连接平台,并基于G+link协议与平台进行数据上下行交互。

  • 上报网关运行状态、事件(告警、日志)等
  • 接收云端下发给网关的操作指令、配置信息等

# 子设备与平台交互

与网关连接的设备(子设备),经网关进行协议解析并转换为G+link协议规定的消息格式后,由网关代理子设备进行上下行数据交互。

当前物联网平台暂未支持网关和子设备关系在平台进行绑定的功能(开发中),实际使用中需将网关和设备全部创建为直连设备,采用此种方式创建的子设备同样具有独立的连接配置信息。

  • 使用网关通道进行上报

平台为网关设备配置高级权限,允许网关使用子设备的接口(topic)上报消息以及订阅子设备的接口获得云端指令

特别注意:此方式下网关具备较高的权限,可以上报任意子设备的消息,包括未连接到网关的子设备。

  • 使用子设备通道进行上报

网关不启用高级权限,使用子设备的配置信息完成与平台的接入鉴权,获得子设备的连接信息,为子设备建立独立的连接进行消息上报。

此方式下需要网关建立多个与平台的连接client,对网关的资源消耗以及网关维持多个连接的逻辑复杂度会提高。

两种方式是目前解决子设备接入的两种实现方式,可根据实际需要选用。

# 操作指南

  1. 创建物联网平台实例,获得实例instanceId

image2021-4-27_21-16-5.png

  1. 进入实例详情页,获得设备接入地址endpoint

image2021-4-27_22-18-58.png

  1. 创建产品

image2021-4-27_22-19-50.png

  1. 进入产品详情页,定义物模型(属性、事件、服务)

image2021-4-27_22-21-1.png

  1. 返回到实例详情页,切换到设备菜单,创建设备

image2021-4-27_22-22-7.png

  1. 获得设备的接入鉴权信息productKey、deviceName、deviceSecret

image2021-4-27_22-23-30.png

  1. 设备向物联网平台请求设备接入资源,保存设备接入资源配置信息,请求设备获取资源连接信息接口,见下方示例代码

  2. 连接平台发送和接收消息,见示例代码

  3. 连接服务端订阅接收消息验证消息是否上报成功

  4. 向网关下发指令配置(接口层面暂不支持,采用具备高级权限设备连接后pub消息可实现验证,具体方式对接是咨询平台PM)

# 参考代码

设备端python示例代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
签名算法:
1、将接口路径、分钟级时间戳和请求参数(body的JOSN字符串)拼接成待加密字符串authStringPrefix,参数之间以换行符"\n"连接,没有请求参数时,填"null"

示例:
    /v1/devices/zfm8n1p5y1qzc09a/test01/test01/resources
    26944410
    {"resourceType":"MQTT"}
    或
    /v1/devices/zfm8n1p5y1qzc09a/test01/test01/resources
    26944410
    null
特别注意:拼接字符串中不能包含空格,body转JSON字符串时需去掉空格

2、分别获取deviceSecret、authStringPrefix的字节数组

2、使用deviceSecret数组对上述字符串数组使用HmacSHA256进行加密

3、将加密后的byte数组进行Base64编码得到signature,并进行urlencode处理

4、将最终得到的signature和分钟级时间戳放到请求的header中

'''
import base64
import json
import requests
import hashlib
import hmac
from urllib.parse import quote
import time
import psutil
import os
import platform
import time
import sys
import paho.mqtt.client as mqtt
import uuid
import random
import json

# 设备配置信息
INSTANCE = "tb78785v8119053e" # 物理网平台实例ID
HOST = "10.70.25.9:8372" # 连接地址
PRODUCT_KEY = "zfv4pdw5" # 产品唯一标识,ProductKey
DEVICE_NAME = 'test032901' # 设备名称,DeviceName
DEVICE_SECRET = 'q175yqchgn6iu55r' # 设备密钥,DeviceSecret

# 生成签名
def do_sign(path, expiryTime, body):
    authStringPrefix = f'{path}\n{expiryTime}\n{body}'
    print(authStringPrefix)

    deviceSecretBytes = DEVICE_SECRET.encode('utf-8')  # 获取密钥字节数组
    print(deviceSecretBytes)

    authStringPrefix = authStringPrefix.encode('utf-8')  # 获取拼接字符串字节数组
    print(authStringPrefix)

    signature = hmac.new(deviceSecretBytes,
                         authStringPrefix, hashlib.sha256).digest()  # 对拼接字符串进行HmacSHA256加密,获取加密字节数组
    print(signature)

    signature = base64.b64encode(
        signature).decode('utf-8')  # 将加密得到的字节数组进行base64编码

    signature = quote(signature, safe='')

    print(signature)
    print('----------------------------------------')

    return signature


count = 0
isconnect = False

# 定义last_state变量,用于存储设备最新状态
last_state = {}
# 内存
last_state['memory_usage'] = 0
last_state['memory_total'] = 0
last_state['memory_free'] = 0
# CPU
last_state['cpu_usage'] = 0
last_state['cpu_core_number'] = 0
# 磁盘
last_state['disk_usage'] = 0
# 系统信息,win->'nt'; Linux->'posix'
last_state['system_info'] = os.name+'/'+sys.platform
# last_state['gateway_version']=platform.version()
last_state['gateway_version'] = '0.0.0'


# 获取本机磁盘使用率和剩余空间G信息
def get_disk_info():
    # 循环磁盘分区
    used_disk_size = 0
    total_disk_size = 0
    for disk in psutil.disk_partitions():
        # 读写方式 光盘 or 有效磁盘类型
        # if 'cdrom' in disk.opts or disk.fstype == '':
        #     continue
        disk_info = psutil.disk_usage(disk.device)
        # 磁盘已使用空间,单位G
        used_disk_size = used_disk_size+disk_info.used/1024/1024/1024
        # 当前磁盘总空间
        total_disk_size = total_disk_size+disk_info.total/1024/1024/1024

    # 计算磁盘总的使用率
    last_state['disk_usage'] = round((used_disk_size/total_disk_size)*100, 2)

# 获取CPU信息
def get_cpu_info():
    # CPU使用率
    last_state['cpu_usage'] = round(psutil.cpu_percent(interval=1), 2)
    # CPU内核数量
    last_state['cpu_core_number'] = round(psutil.cpu_count(logical=True), 2)

# 获取内存信息
def get_memory_info():
    virtual_memory = psutil.virtual_memory()
    # 内存使用率
    last_state['memory_usage'] = round(virtual_memory.percent, 2)
    # 内存总量
    last_state['memory_total'] = round(virtual_memory.total/1024/1024/1024, 2)
    # 剩余内存
    last_state['memory_free'] = round(virtual_memory.free/1024/1024/1024, 2)


def update_info():
    get_disk_info()
    get_cpu_info()
    get_memory_info()
    # print(last_state)


def on_connect(client, userdata, flags, rc):
    global isconnect
    print("Connected with result code "+str(rc))
    # client.subscribe("#")
    isconnect = True


def on_publish(client, userdata, mid):
    print('MQTT message pub:'+str(mid))


def on_message(client, userdata, msg):
    global count
    count += 1
    print(str(count)+" 主题:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))


def on_subscribe(client, userdata, mid, granted_qos):
    print("On Subscribed: qos = %d" % granted_qos)


def on_disconnect(client, userdata, rc):
    global isconnect
    if rc != 0:
        print("Unexpected disconnection %s" % rc)
    else:
        print("expected disconnection %s" % rc)
    isconnect = False
    time.sleep(3)
    doreq()

# 发起连接请求
def doreq():
    expiryTime = str(int(time.time()/60))
    body = {"resourceType": "MQTT"}
    path = "/v1/devices/"+INSTANCE+'/'+PRODUCT_KEY+'/'+DEVICE_NAME+"/resources"
    # body = {}
    # path = "/v1/devices/"+INSTANCE+'/'+PRODUCT_KEY+'/'+DEVICE_NAME+"/auth"

    # body = json.dumps(body, separators=(',', ':')) if body !='{}' else body
    body = json.dumps(body, separators=(',', ':'))

    signature = do_sign(path, expiryTime, body)

    headers = {}
    headers['Content-Type'] = "application/json"
    headers['signature'] = signature
    headers['expiryTime'] = expiryTime
    print(headers)

    url = f'http://{HOST}{path}'

    r = requests.post(url, headers=headers, data=body)

    print(r)
    print(r.text)
    # 日志文件
    file = "log.txt"

    with open(file, 'a+') as f:
        f.write(r.text+'\n')  # 加\n换行显示

    # topic = f'thing/7bcdf8071dee22208a3b6091bf2b9531/zfv4pdw5/test032902/property/post/request'
    topic = f'thing/{PRODUCT_KEY}/{DEVICE_NAME}/property/post'
    json_r = json.loads(r.text)
    content = json_r['content']
    mqtt_client_id = content['clientId']
    mqtt_username = content['username']
    mqtt_password = content['password']
    mqtt_host = content['broker']
    mqtt_port = 1883
    client = mqtt.Client(mqtt_client_id)
    client.username_pw_set(mqtt_username, mqtt_password)
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_publish = on_publish
    client.on_subscribe = on_subscribe
    client.on_disconnect = on_disconnect
    client.connect(mqtt_host, mqtt_port, 60)
    client.loop_start()

    time.sleep(3)

    while isconnect:
        update_info()
        payload = {
            "reqId": str(uuid.uuid4()),
            "method": "thing.property.post",
            "version": "1.0",
            "timestamp": int(time.time()*1000),
            "properties": last_state
        }
        payload = json.dumps(payload)
        client.publish(topic, payload, 0)
        print('MQTT message published:\n%s' % payload)
        time.sleep(5)


if __name__ == "__main__":
    start = time.time()
    doreq()
    print("耗时===========================", time.time()-start)