查询设备历史数据
更新时间: 2023-04-10
应用侧可以自行实现设备数据查询接口,以下使用python脚本演示设备数据查询过程,开发者可参考一下思路使用JAVA、nodejs等其他语言实现。
业务应用实现设备数据查询参考如下流程:
调用设备管理平台的openapi获得设备列表
选择指定设备获取设备详情信息及产品productKey
使用productKey查询设备物模型并缓存在程序中
使用物模型中定义的设备属性,生成查询条件
调用TSDB接口查询设备数据
# 调用设备管理接口python3示例
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
本示例代码演示如何通过api接口获取物联网平台的各项数据
'''
import json
import requests
import hashlib
import hmac
import urllib
import time
import string
import datetime
from AIoTbce.auth.bce_v1_signer import sign
from baidubce.auth.bce_credentials import BceCredentials
from baidubce import utils
AUTHORIZATION = "authorization"
BCE_PREFIX = "x-bce-"
DEFAULT_ENCODING = 'UTF-8'
# POC
AK = "c590c33583------e9a6d9fef6fe26"
SK = "f2b0452cc5------5788f484617dc0c"
# 请求物联网平台openapi接口签名
def do_sign(params):
# 1.AK/SK、host、method、URL绝对路径、querystring
host = params["host"]
method = params["method"]
query = params["query"]
URI = params["URI"]
# 2.x-bce-date
x_bce_date = time.gmtime()
x_bce_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', x_bce_date)
# 3.header和signedHeaders
header = {
"Host": host,
"content-type": "application/json;charset=utf-8",
"x-bce-date": x_bce_date
}
signedHeaders = "content-type;host;x-bce-date"
# 4.认证字符串前缀
authStringPrefix = "bce-auth-v1" + "/" + AK + "/" + x_bce_date + "/" + "1800"
# 5.生成CanonicalRequest
# 5.1生成CanonicalURI
# windows下为urllib.parse.quote,Linux下为urllib.quote
CanonicalURI = urllib.parse.quote(URI)
# 5.2生成CanonicalQueryString
CanonicalQueryString = utils.get_canonical_querystring(
query, True).decode('utf-8') # 如果您调用的接口的query比较复杂的话,需要做额外处理
# 5.3生成CanonicalHeaders
result = []
for key, value in header.items():
tempStr = str(urllib.parse.quote(key.lower(), safe="")) + \
":" + str(urllib.parse.quote(value, safe=""))
result.append(tempStr)
result.sort()
CanonicalHeaders = "\n".join(result)
# 5.4拼接得到CanonicalRequest
CanonicalRequest = method + "\n" + CanonicalURI + \
"\n" + CanonicalQueryString + "\n" + CanonicalHeaders
# 6.生成signingKey
signingKey = hmac.new(SK.encode('utf-8'),
authStringPrefix.encode('utf-8'), hashlib.sha256)
# 7.生成Signature
Signature = hmac.new((signingKey.hexdigest()).encode(
'utf-8'), CanonicalRequest.encode('utf-8'), hashlib.sha256)
# 8.生成Authorization并放到header里
header['Authorization'] = authStringPrefix + "/" + \
signedHeaders + "/" + Signature.hexdigest()
# 9.发送API请求并接受响应
url = "http://"+host + URI
# print('ready rq==============', url, header)
return [url, method, query, header, params]
def doreq(params):
sign_info = do_sign(params)
if sign_info[1] == 'GET':
r = requests.get(
sign_info[0], params=sign_info[2], headers=sign_info[3], verify=False)
elif sign_info[1] == 'POST':
r = requests.post(sign_info[0], headers=sign_info[3],
data=json.dumps(sign_info[4]), verify=False)
else:
return
print(r)
# print(r.text)
res = json.loads(r.text)
# print(res)
# for device in res['result'] :
# print('--------------------------------------')
# print(device)
# 日志文件
file = "log.txt"
with open(file, 'a+') as f:
f.write(r.text+'\n') # 加\n换行显示
return res
if __name__ == "__main__":
start = time.time()
# 物联网平台实例ID
instanceId = 'bsx110sdsfb5ihc8'
# 调用设备获取设备列表
body = {}
query = {
"pageNo": 1,
"pageSize": 50
}
params = {
"host": "180.76.236.1:8371",
"method": "GET",
"URI": "/v1/devices/"+instanceId,
"query": query,
"body": body
}
deviceList = doreq(params)
print(deviceList)
print('-----------------------------------------------------------------')
# 调用获取设备物模型详情(示例中选取了第一个设备)
productKey = deviceList['result'][0]['productKey']
body = {}
query = {}
params = {
"host": "180.76.236.1:8371",
"method": "POST",
"URI": f"/v1/products/{instanceId}/{productKey}/features/detail",
"query": query,
"body": body
}
features = doreq(params)
print(features)
for property in features['properties']:
print(property)
print(features['properties'][property])
print('-----------------------------------------------------------------')
print("耗时===========================", time.time()-start)
# 调用TSDB数据查询接口查询设备历史数据python3示例
根据设备管理接口返回的设备属性列表查询设备数据
import AIoTbce.protocol
from baidubce.bce_client_configuration import BceClientConfiguration
from baidubce.auth.bce_credentials import BceCredentials
from baidubce.services.tsdb.tsdb_client import TsdbClient
import time
import datetime
import json
# when use https as the protocol, you may find certificate expire problem, this can be resovled by adding the following lines
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
def change_type(byte):
if isinstance(byte, bytes):
return str(byte, encoding="utf-8")
return json.JSONEncoder.default(byte)
##########必填配置#############
# 用户的时序数据库域名,形式如databasename.tsdb.iot.gz.baidubce.com
HOST = 'iotxxxxx021.tsdb-max8bnu8nywt.tsdb.iot.gz.baidubce.com'
AK = '989962xxxxx38ef2ee457b28fe3e' # 用户 Access Key ID
SK = '4939c4xxxxx4ef3b9723d32ac3854c8' # 用户 Secret Access Key
###########可选配置#############
# 使用HTTP协议
protocol = baidubce.protocol.HTTP
# 使用HTTPS协议
# protocol= baidubce.protocol.HTTPS
connection_timeout_in_mills = None # 连接超时时间
send_buf_size = None # 发送缓冲区大小
recv_buf_size = None # 接收缓冲区大小
retry_policy = None # 重试策略
# 生成config对象
config = BceClientConfiguration(
credentials=BceCredentials(AK, SK),
endpoint=HOST,
protocol=protocol,
connection_timeout_in_mills=connection_timeout_in_mills,
send_buf_size=send_buf_size,
recv_buf_size=recv_buf_size,
retry_policy=retry_policy)
# 创建TsdbCient
tsdb_client = TsdbClient(config)
# 查询单个属性记录
curtime = int(round(time.time() * 1000))
query_list = [{
"metric": "pocdemo",
"field": "temperature", # 属性标识
"filters": {
"start": curtime-24*60*60*1000, # 开始时间
"end": curtime, # 截止时间
"tags": {
"deviceName": ["ht001"], # 设备标识
"productKey":["dht22"] # 产品标识
},
"value": ">= 0"
},
"limit": 1000
}]
result = tsdb_client.get_datapoints(query_list)
print(result.results)
print('-----------------------------------------------------------------')
# 查询设备多个属性记录
query_list = [{
"metric": "pocdemo",
"fields": ["temperature", "humidity", "location"], # 多个属性标识
"filters": {
"start": curtime-24*60*60*1000, # 开始时间
"end": curtime, # 截止时间
"tags": {
"deviceName": ["ht001"], # 设备标识
"productKey":["dht22"], # 产品标识
"type":["property"] # 属性标识
}
},
"limit": 1000
}]
result = tsdb_client.get_datapoints(query_list)
print(result.results)
# print(json.dumps(result.results, ensure_ascii=False))
print('-----------------------------------------------------------------')
# 查询设备最后一次上报状态
query_list = [{
"metric": "pocdemo",
"fields": ["temperature", "humidity", "location", "last_time"], # 查询多个域
"filters": {
"start": 1597507200000, # 最后一次上报消息的时间戳(固定值)
"end": 1597507200000,
"tags": {
"deviceName": ["ht001"],
"productKey":["dht22"],
"type":["last_state"] # 最后一次上报消息标识
}
},
"limit": 1000
}]
result = tsdb_client.get_datapoints(query_list)
print(type(result.results))
print(result.results[0])
res = result.results[0]
print(type(res))
data = {
'timestamp':0,
'properties':{}
}
for field in range(0,len(res.fields)):
if res.fields[field] == 'last_time' :
data['timestamp'] = res.groups[0].values[0][field+1]
else :
data['properties'][res.fields[field]] = res.groups[0].values[0][field+1]
# print(res.fields[field])
# print(res.groups[0].values[0][field+1])
print(data)
print('---------')
上一篇: 使用G+link协议进行通信 下一篇: 使用C语言开发设备端示例代码