实际的业务应用中,经常会遇到从第三方系统中采集设备数据的场景。这种设备已经通过某种方式接入本地系统或其他IoT平台,希望将设备通过系统打通的方式进行的设备接入我们称之为云云对接。
云云对接需要开发一个消息代理服务将数据从原平台同步到平台,同时将平台控制指令通过接口方式回传给原系统,实现双向数据交互,本质上消息代理服务是一个虚拟网关或软网关
# 接入流程
云云对接在数据交互流程中与直连设备和网关设备交互一致,平台提供多种云云对接方案,用户可以根据需要进行选择;
方式 | 使用场景 | 优缺点 |
---|---|---|
网关设备 | 少量设备且消息频率低的场景 | 优点: 与普通网关接入方式一致,使用MQTT协议,便于维护 缺点: 第三方系统中的设备在平台中均注册为子设备类型 设备数量不能超过1000个 支持的消息量小,消息并发不超过100QPS |
网关网桥 | 大量设备且消息频率较高的场景 | 优点: 使用AMQP协议,吞吐量大 可以对接多个第三方系统,每个系统对应一个网关 缺点: 第三方系统中的设备在平台中均注册为子设备类型 设备数量不能超过1000个 |
实例网桥 | 大量设备且消息频率较高的场景 | 优点: 使用AMQP协议,吞吐量大 第三方系统中的设备类型可以与平台一一对应 缺点: 平台只提供一个实例级网桥,只能对提供给一个第三方系统 |
# 设备与平台交互
数据交互层上云云对接与网关代理子设备接入一致,以下重点讲解使用AMQP协议网桥接入的流程和方法。
# 使用网关设备实现云云对接
参考《网关与子设备接入》章节,将第三方系统中的设备注册为物联网平台的子设备,使用网关代理消息交互。
# 使用网关网桥实现云云对接
- 在平台中将第三方系统中需要对接的产品和设备注册为子产品和子设备
- 创建网关并在控制台网关详情页开启网关网桥
- 获取网桥连接配置信息
配置信息示例
{
"username":"yk8z3qqz805vjcqi",
"password":"jdh1142d31r6",
"vhost":"/",
"host":"120.48.6.179",
"port":8672,
"sourceQueue":"zasr9fjmk53aur26_cgateway_sncbox202112280001_source_58gza1tg",
"exchange":"zasr9fjmk53aur26_cgateway_sncbox202112280001_writable_exchange",
"routeKey":"1p28v",
"state":true
}
- 使用获得的密钥通过AMQP协议连接平台,可以使用任意RabbitMQ客户端实现连接及消息消费和生产
开源RabbitMQ AMQP协议支持的多语言或框架SDK
语言或框架 | SDK |
---|---|
Java | RabbitMQ Java Client Library (opens new window) |
Spring Framework | Spring AMQP project for Java (opens new window) |
.NET | .NET SDK (opens new window) |
Python | Python SDK (opens new window) |
PHP | PHP SDK (opens new window) |
Rust | Rust SDK (opens new window) |
C and C++ | C and C++ SDK (opens new window) |
Go | Go SDK (opens new window) |
JavaScript and Node | JavaScript and Node SDK (opens new window) |
Objective-C and Swift | Objective-C and Swift SDK (opens new window) |
其他 | 更多信息,请参见Clients Libraries and Developer Tools (opens new window) |
- 按G+link协议规范与平台进行数据交互
- 数据封装协议
AMQP作为通信协议,交互协议与G+link保持一致,区别在于将MQTT协议中的topic作为header的参数进行传输
headers: 包含当前网关的接口topic,例如thing/cgateway/sncbox202112280001/event/post,必须使用网关的topic,平台会进行校验,没有权限的topic数据将被丢弃
body: link协议对应接口的payload作为message,使用protobuf压缩编码(proto版本 3.15),使用网关网桥时,payload中必须包含子设备标识
proto格式如下:
message.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.AIoT.bce.dmp.proto.amqp";
option java_outer_classname = "AMQPMessageProto";
package amqp;
import "model.proto";
// Route Message definition
message Message {
// message format version, current is 'v1' 可以不关注
string apiVersion = 1;
// message qos ,当前只支持0 可以不关注
int32 qos = 2;
// metadata for the message
model.Metadata metadata = 3;
// user specified message
bytes message = 4;
}
model.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.AIoT.bce.dmp.proto.model";
option java_outer_classname = "ModelProto";
// This proto contains commons models for different proto file
package model;
// The metadata of message
message Metadata {
// The unique id for the message 可以不关注
string uid = 1;
// The creation timestamp of the message
int64 createTime = 2;
// The tags of the message
map labels = 3;
}
如何生成proto产出的pojo,请搜索protoc相关内容以及了解protobuf
- 设备消息上报Java示例代码
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.AIoT.bce.dmp.proto.amqp.Message;
import com.baidu.bce.dmp.proto.model.Metadata;
import com.google.protobuf.ByteString;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TestC2C {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("host"); // 5中的host
factory.setUsername("username"); // 5中的username
factory.setPassword("password"); // 5中的password
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
Metadata.Builder mb = Metadata.newBuilder();
mb.setCreateTime(System.currentTimeMillis());
Message.Builder b = Message.newBuilder();
b.setMetadata(mb.build());
b.setMessage(ByteString.copyFrom("这里是你设备产生的实际数据", StandardCharsets.UTF_8));
Map headers = new HashMap<>();
headers.put("topic", "thing/{product}/{deviceName}/property/post"); // 这里根据你的产品Key和设备名去构造G+link相关的topic(G+link有很多类型)
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish(
"writable_exchange", // 5中的exchange
"routeKey", // 5中的routeKey
properties,
b.build().toByteArray());
}
}
- 接收平台下发指令示例代码
import java.io.IOException;
import com.AIoT.bce.dmp.proto.amqp.Message;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Consumer extends DefaultConsumer {
public Consumer(Channel channel) {
super(channel);
}
@Override
public void handleConsumeOk(String consumerTag) {
super.handleConsumeOk(consumerTag);
}
@Override
public void handleCancelOk(String consumerTag) {
super.handleCancelOk(consumerTag);
}
@Override
public void handleCancel(String consumerTag) throws IOException {
super.handleCancel(consumerTag);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// 交换机
String exchange = envelope.getExchange();
System.out.println("----- exchange -----");
System.out.println(consumerTag);
System.out.println(exchange);
// 路由key
System.out.println("----- routeKey -----");
String routingKey = envelope.getRoutingKey();
System.out.println(routingKey);
// 消息Id mq在channel中用来标识消息的id,可用于确认消息已接受
System.out.println("----- headers -----");
long deliveryTag = envelope.getDeliveryTag();
System.out.println(deliveryTag);
System.out.println(properties);
// 消息内容
System.out.println("----- msg -----");
System.out.println(Message.parseFrom(body));
}
}
# 使用实例网桥实现云云对接
- 在平台中注册第三方系统中需要对接的产品和设备,设备类型与第三方系统中一致
- 在控制台服务管理中开启实例网桥并获得网桥连接配置密钥
- 使用获得的密钥通过AMQP协议连接平台
- 按link协议规范与平台进行数据交互
使用实例网桥时,与网关网桥的区别在于headers中的topic可以是网关设备、直连设备、子设备的对应任意设备类型G+link接口,相应的body中的message与对应G+link接口中的payload一致
可以理解为是将MQTT协议中的topic放置在headers中的topic位置,payload放置在body的message位置 参考《直连设备接入》、《网关与子设备接入》及《设备管理>数据交互协议》章节进行了解