企业信息化系统物联网设备管理与数据采集
物联网应用概述
物联网(IoT)技术正在深刻改变企业信息化系统的数据采集方式和业务监控能力。通过将传感器、执行器、智能设备接入企业信息化平台,可以实现生产设备监控、环境参数采集、能源管理等智能化应用。本文介绍EIMS系统中物联网设备管理的技术实现方案。
系统架构设计
物联网数据采集系统采用四层架构:
| 层次 | 功能描述 | 技术选型 |
|---|---|---|
| 设备层 | 各类传感器、网关、智能设备 | Modbus TCP/RTU、MQTT |
| 采集层 | 协议转换、数据预处理、边缘计算 | Node-RED、IoT Edge |
| 平台层 | 设备管理、消息路由、数据存储 | EMQX、Redis、InfluxDB |
| 应用层 | 数据展示、告警通知、决策分析 | EIMS + Grafana |
核心功能实现
1. MQTT 消息接入
设备通过 MQTT 协议接入平台:
// MQTT 消息接入服务
const mqtt = require('mqtt');
const Influx = require('influx');
class IoTGateway {
constructor() {
this.client = null;
this.influx = new Influx.InfluxDB({
host: 'localhost',
database: 'iot_data',
schema: [
{
measurement: 'sensor_data',
tags: ['device_id', 'device_type', 'location'],
fields: ['temperature', 'humidity', 'pressure', 'value']
}
]
});
}
// 连接 MQTT Broker
connect() {
this.client = mqtt.connect('mqtt://localhost:1883', {
clientId: 'eims-gateway',
username: 'eims',
password: 'password',
clean: true
});
this.client.on('connect', () => {
console.log('MQTT Gateway connected');
// 订阅设备数据主题
this.client.subscribe('eims/+/data', { qos: 1 });
this.client.subscribe('eims/+/status', { qos: 1 });
});
this.client.on('message', (topic, message) => {
this.handleMessage(topic, message);
});
}
// 处理设备消息
async handleMessage(topic, message) {
const parts = topic.split('/');
const deviceId = parts[1];
const msgType = parts[2];
const payload = JSON.parse(message.toString());
if (msgType === 'data') {
await this.processSensorData(deviceId, payload);
} else if (msgType === 'status') {
await this.processDeviceStatus(deviceId, payload);
}
}
// 处理传感器数据
async processSensorData(deviceId, data) {
// 解析设备信息
const device = await this.getDeviceInfo(deviceId);
// 写入时序数据库
await this.influx.writePoints([{
measurement: 'sensor_data',
tags: {
device_id: deviceId,
device_type: device.type,
location: device.location
},
fields: {
temperature: data.temperature || 0,
humidity: data.humidity || 0,
pressure: data.pressure || 0,
value: data.value || 0
},
timestamp: new Date(data.timestamp || Date.now())
}]);
// 检查告警阈值
await this.checkThreshold(deviceId, data);
// 更新设备状态
await this.updateDeviceState(deviceId, 'online', data);
}
// 检查告警阈值
async checkThreshold(deviceId, data) {
const device = await this.getDeviceInfo(deviceId);
const alerts = [];
if (device.thresholds.temperature) {
if (data.temperature > device.thresholds.temperature.max) {
alerts.push({ type: 'temperature_high', value: data.temperature });
}
if (data.temperature < device.thresholds.temperature.min) {
alerts.push({ type: 'temperature_low', value: data.temperature });
}
}
if (alerts.length > 0) {
await this.sendAlerts(deviceId, alerts);
}
}
}
2. 设备注册与管理
统一管理物联网设备:
// 设备管理服务
class DeviceManager {
constructor(db) {
this.db = db;
}
// 注册新设备
async registerDevice(deviceInfo) {
const device = {
device_id: deviceInfo.deviceId,
device_name: deviceInfo.name,
device_type: deviceInfo.type,
location: deviceInfo.location,
protocol: deviceInfo.protocol || 'mqtt',
status: 'offline',
config: JSON.stringify(deviceInfo.config || {}),
thresholds: JSON.stringify(deviceInfo.thresholds || {}),
created_at: new Date(),
updated_at: new Date()
};
await this.db.collection('devices').insertOne(device);
// 初始化设备状态
await this.db.collection('device_states').insertOne({
device_id: deviceInfo.deviceId,
online: false,
last_online: null,
last_data: null
});
return device;
}
// 获取设备列表
async getDevices(filters = {}) {
const query = {};
if (filters.type) query.device_type = filters.type;
if (filters.location) query.location = filters.location;
if (filters.status) query.status = filters.status;
return await this.db.collection('devices').find(query).toArray();
}
// 批量导入设备
async batchRegister(devices) {
const results = [];
for (const device of devices) {
try {
const result = await this.registerDevice(device);
results.push({ deviceId: device.deviceId, success: true, result });
} catch (error) {
results.push({ deviceId: device.deviceId, success: false, error: error.message });
}
}
return results;
}
// 远程配置设备
async configureDevice(deviceId, config) {
const device = await this.getDevice(deviceId);
if (!device) throw new Error('Device not found');
// 发送配置命令到设备
const command = {
cmd: 'config_update',
config: config,
timestamp: Date.now()
};
await this.publishToDevice(deviceId, command);
return { success: true, deviceId };
}
// 远程重启设备
async rebootDevice(deviceId) {
const command = {
cmd: 'reboot',
timestamp: Date.now()
};
await this.publishToDevice(deviceId, command);
return { success: true, deviceId };
}
}
3. 实时数据可视化
集成 Grafana 实现数据大屏展示:
// 数据查询 API
router.get('/api/iot/realtime/:deviceId', async (ctx) => {
const { deviceId } = ctx.params;
const { timeRange = '1h' } = ctx.query;
const range = {
'5m': '5m',
'1h': '1h',
'24h': '24h',
'7d': '7d'
}[timeRange];
const result = await influx.query(`
SELECT last(temperature) as temperature,
last(humidity) as humidity,
last(pressure) as pressure
FROM sensor_data
WHERE device_id = '${deviceId}'
AND time > now() - ${range}
`);
ctx.body = {
deviceId,
timeRange,
data: result[0] || {}
};
});
// 历史数据查询 API
router.get('/api/iot/history/:deviceId', async (ctx) => {
const { deviceId } = ctx.params;
const { startTime, endTime, interval = '5m' } = ctx.query;
const result = await influx.query(`
SELECT mean(temperature) as temperature,
mean(humidity) as humidity,
mean(pressure) as pressure
FROM sensor_data
WHERE device_id = '${deviceId}'
AND time >= '${startTime}'
AND time <= '${endTime}'
GROUP BY time(${interval})
`);
ctx.body = {
deviceId,
startTime,
endTime,
data: result
};
});
// 聚合统计 API
router.get('/api/iot/statistics/:deviceId', async (ctx) => {
const { deviceId } = ctx.params;
const { period = '24h' } = ctx.query;
const stats = await influx.query(`
SELECT mean(temperature) as avg_temp,
max(temperature) as max_temp,
min(temperature) as min_temp,
mean(humidity) as avg_humidity,
mean(pressure) as avg_pressure
FROM sensor_data
WHERE device_id = '${deviceId}'
AND time > now() - ${period}
`);
ctx.body = {
deviceId,
period,
statistics: stats[0] || {}
};
});
典型应用场景
物联网在企业信息化中的典型应用:
- 生产设备监控:实时监控机床、PLC、机器人等设备的运行状态和工艺参数
- 环境监测:监测车间温湿度、空气质量、光照强度,保障生产环境
- 能源管理:采集电力、燃气、压缩空气等能源消耗数据,优化能源成本
- 智能仓储:使用 RFID 和温湿度传感器实现库存可视化管理
- 安防监控:集成视频监控、门禁、烟雾报警等安全防范系统
数据处理策略
针对大规模物联网数据的处理策略:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 数据过滤 | 边缘端过滤无效数据和异常值 | 所有场景 |
| 数据聚合 | 按时间窗口聚合,减少存储量 | 历史数据查询 |
| 边缘计算 | 在网关端完成计算,减少云端压力 | 实时告警场景 |
| 冷热分离 | 实时数据存Redis,历史数据存InfluxDB | 大数据量场景 |
总结
物联网技术为企业信息化系统提供了实时感知能力,通过将各类设备接入平台,可以实现生产过程的透明化、管理决策的数据化。在实际落地时,需要根据业务需求选择合适的通信协议,合理规划数据采集频率,并做好数据存储和性能优化的规划。