企业信息化管理系统

EIMS - 助力企业数字化转型

企业信息化系统物联网设备管理与数据采集

物联网应用概述

物联网(IoT)技术正在深刻改变企业信息化系统的数据采集方式和业务监控能力。通过将传感器、执行器、智能设备接入企业信息化平台,可以实现生产设备监控、环境参数采集、能源管理等智能化应用。本文介绍EIMS系统中物联网设备管理的技术实现方案。

系统架构设计

物联网数据采集系统采用四层架构:

层次 功能描述 技术选型
设备层 各类传感器、网关、智能设备 Modbus TCP/RTU、MQTT
采集层 协议转换、数据预处理、边缘计算 Node-RED、IoT Edge
平台层 设备管理、消息路由、数据存储 EMQX、Redis、InfluxDB
应用层 数据展示、告警通知、决策分析 EIMS + Grafana

核心功能实现

1. MQTT 消息接入

设备通过 MQTT 协议接入平台:

// MQTT 消息接入服务
const mqtt = require('mqtt');
const Influx = require('influx');

class IoTGateway {
  constructor() {
    this.client = null;
    this.influx = new Influx.InfluxDB({
      host: 'localhost',
      database: 'iot_data',
      schema: [
        {
          measurement: 'sensor_data',
          tags: ['device_id', 'device_type', 'location'],
          fields: ['temperature', 'humidity', 'pressure', 'value']
        }
      ]
    });
  }

  // 连接 MQTT Broker
  connect() {
    this.client = mqtt.connect('mqtt://localhost:1883', {
      clientId: 'eims-gateway',
      username: 'eims',
      password: 'password',
      clean: true
    });

    this.client.on('connect', () => {
      console.log('MQTT Gateway connected');
      // 订阅设备数据主题
      this.client.subscribe('eims/+/data', { qos: 1 });
      this.client.subscribe('eims/+/status', { qos: 1 });
    });

    this.client.on('message', (topic, message) => {
      this.handleMessage(topic, message);
    });
  }

  // 处理设备消息
  async handleMessage(topic, message) {
    const parts = topic.split('/');
    const deviceId = parts[1];
    const msgType = parts[2];

    const payload = JSON.parse(message.toString());

    if (msgType === 'data') {
      await this.processSensorData(deviceId, payload);
    } else if (msgType === 'status') {
      await this.processDeviceStatus(deviceId, payload);
    }
  }

  // 处理传感器数据
  async processSensorData(deviceId, data) {
    // 解析设备信息
    const device = await this.getDeviceInfo(deviceId);

    // 写入时序数据库
    await this.influx.writePoints([{
      measurement: 'sensor_data',
      tags: {
        device_id: deviceId,
        device_type: device.type,
        location: device.location
      },
      fields: {
        temperature: data.temperature || 0,
        humidity: data.humidity || 0,
        pressure: data.pressure || 0,
        value: data.value || 0
      },
      timestamp: new Date(data.timestamp || Date.now())
    }]);

    // 检查告警阈值
    await this.checkThreshold(deviceId, data);

    // 更新设备状态
    await this.updateDeviceState(deviceId, 'online', data);
  }

  // 检查告警阈值
  async checkThreshold(deviceId, data) {
    const device = await this.getDeviceInfo(deviceId);
    const alerts = [];

    if (device.thresholds.temperature) {
      if (data.temperature > device.thresholds.temperature.max) {
        alerts.push({ type: 'temperature_high', value: data.temperature });
      }
      if (data.temperature < device.thresholds.temperature.min) {
        alerts.push({ type: 'temperature_low', value: data.temperature });
      }
    }

    if (alerts.length > 0) {
      await this.sendAlerts(deviceId, alerts);
    }
  }
}

2. 设备注册与管理

统一管理物联网设备:

// 设备管理服务
class DeviceManager {
  constructor(db) {
    this.db = db;
  }

  // 注册新设备
  async registerDevice(deviceInfo) {
    const device = {
      device_id: deviceInfo.deviceId,
      device_name: deviceInfo.name,
      device_type: deviceInfo.type,
      location: deviceInfo.location,
      protocol: deviceInfo.protocol || 'mqtt',
      status: 'offline',
      config: JSON.stringify(deviceInfo.config || {}),
      thresholds: JSON.stringify(deviceInfo.thresholds || {}),
      created_at: new Date(),
      updated_at: new Date()
    };

    await this.db.collection('devices').insertOne(device);

    // 初始化设备状态
    await this.db.collection('device_states').insertOne({
      device_id: deviceInfo.deviceId,
      online: false,
      last_online: null,
      last_data: null
    });

    return device;
  }

  // 获取设备列表
  async getDevices(filters = {}) {
    const query = {};
    if (filters.type) query.device_type = filters.type;
    if (filters.location) query.location = filters.location;
    if (filters.status) query.status = filters.status;

    return await this.db.collection('devices').find(query).toArray();
  }

  // 批量导入设备
  async batchRegister(devices) {
    const results = [];
    for (const device of devices) {
      try {
        const result = await this.registerDevice(device);
        results.push({ deviceId: device.deviceId, success: true, result });
      } catch (error) {
        results.push({ deviceId: device.deviceId, success: false, error: error.message });
      }
    }
    return results;
  }

  // 远程配置设备
  async configureDevice(deviceId, config) {
    const device = await this.getDevice(deviceId);
    if (!device) throw new Error('Device not found');

    // 发送配置命令到设备
    const command = {
      cmd: 'config_update',
      config: config,
      timestamp: Date.now()
    };

    await this.publishToDevice(deviceId, command);
    return { success: true, deviceId };
  }

  // 远程重启设备
  async rebootDevice(deviceId) {
    const command = {
      cmd: 'reboot',
      timestamp: Date.now()
    };

    await this.publishToDevice(deviceId, command);
    return { success: true, deviceId };
  }
}

3. 实时数据可视化

集成 Grafana 实现数据大屏展示:

// 数据查询 API
router.get('/api/iot/realtime/:deviceId', async (ctx) => {
  const { deviceId } = ctx.params;
  const { timeRange = '1h' } = ctx.query;

  const range = {
    '5m': '5m',
    '1h': '1h',
    '24h': '24h',
    '7d': '7d'
  }[timeRange];

  const result = await influx.query(`
    SELECT last(temperature) as temperature,
           last(humidity) as humidity,
           last(pressure) as pressure
    FROM sensor_data
    WHERE device_id = '${deviceId}'
    AND time > now() - ${range}
  `);

  ctx.body = {
    deviceId,
    timeRange,
    data: result[0] || {}
  };
});

// 历史数据查询 API
router.get('/api/iot/history/:deviceId', async (ctx) => {
  const { deviceId } = ctx.params;
  const { startTime, endTime, interval = '5m' } = ctx.query;

  const result = await influx.query(`
    SELECT mean(temperature) as temperature,
           mean(humidity) as humidity,
           mean(pressure) as pressure
    FROM sensor_data
    WHERE device_id = '${deviceId}'
    AND time >= '${startTime}'
    AND time <= '${endTime}'
    GROUP BY time(${interval})
  `);

  ctx.body = {
    deviceId,
    startTime,
    endTime,
    data: result
  };
});

// 聚合统计 API
router.get('/api/iot/statistics/:deviceId', async (ctx) => {
  const { deviceId } = ctx.params;
  const { period = '24h' } = ctx.query;

  const stats = await influx.query(`
    SELECT mean(temperature) as avg_temp,
           max(temperature) as max_temp,
           min(temperature) as min_temp,
           mean(humidity) as avg_humidity,
           mean(pressure) as avg_pressure
    FROM sensor_data
    WHERE device_id = '${deviceId}'
    AND time > now() - ${period}
  `);

  ctx.body = {
    deviceId,
    period,
    statistics: stats[0] || {}
  };
});

典型应用场景

物联网在企业信息化中的典型应用:

数据处理策略

针对大规模物联网数据的处理策略:

策略 说明 适用场景
数据过滤 边缘端过滤无效数据和异常值 所有场景
数据聚合 按时间窗口聚合,减少存储量 历史数据查询
边缘计算 在网关端完成计算,减少云端压力 实时告警场景
冷热分离 实时数据存Redis,历史数据存InfluxDB 大数据量场景

总结

物联网技术为企业信息化系统提供了实时感知能力,通过将各类设备接入平台,可以实现生产过程的透明化、管理决策的数据化。在实际落地时,需要根据业务需求选择合适的通信协议,合理规划数据采集频率,并做好数据存储和性能优化的规划。

← 上一篇:企业信息化系统区块链技术应用与数据溯源 上篇:企业信息化系统AI智能客服设计与实现 →