useMqtt.ts 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // useMqtt.ts
  2. import { ref, onUnmounted } from 'vue'
  3. import mqtt, { MqttClient, IClientOptions } from 'mqtt'
  4. import { ElMessage } from 'element-plus'
  5. type MessageCallback = (topic: string, payload: any) => void
  6. // 基础配置 (建议移至环境变量 import.meta.env.VITE_MQTT_HOST)
  7. const BASE_OPTIONS: IClientOptions = {
  8. clean: true,
  9. reconnectPeriod: 5000,
  10. connectTimeout: 10000,
  11. username: 'yanfan',
  12. password:
  13. 'eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2tleSI6IjY0YmM2NjJlLWZhMjQtNGY1Ny1hOTk1LWZiMGM2YjNhYzI4OCJ9.9nxoDUNGTk1szRlZHHG0AcWZctLrzJ16UA5rsBagHNcD10PC-LIMTgAr2CK1Ppafa6cW5XPdn7RqBF6iZjHtww'
  14. }
  15. export function useMqtt() {
  16. const client = ref<MqttClient | null>(null)
  17. const isConnected = ref(false)
  18. const message = ref<any>(null) // 响应式消息数据
  19. let customMessageCallback: MessageCallback | null = null
  20. // 初始化连接
  21. const connect = (host: string, options: IClientOptions = {}, onMessage?: MessageCallback) => {
  22. if (client.value && client.value.connected) {
  23. console.warn('当前实例已连接 MQTT,跳过初始化')
  24. return
  25. }
  26. if (onMessage) {
  27. customMessageCallback = onMessage
  28. }
  29. // 合并配置,生成唯一 ClientId
  30. const finalOptions = {
  31. ...BASE_OPTIONS,
  32. clientId: `web-${Math.random().toString(16).substr(2)}`,
  33. ...options
  34. }
  35. try {
  36. client.value = mqtt.connect(host, finalOptions)
  37. client.value.on('connect', () => {
  38. isConnected.value = true
  39. ElMessage.success('MQTT 连接成功')
  40. })
  41. client.value.on('error', (err) => {
  42. console.error('MQTT Error:', err)
  43. ElMessage.error(`MQTT 错误: ${err.message}`)
  44. isConnected.value = false
  45. })
  46. client.value.on('close', () => {
  47. isConnected.value = false
  48. })
  49. // 全局消息监听,更新响应式数据
  50. client.value.on('message', (topic, payload) => {
  51. let parsedData: any
  52. try {
  53. parsedData = JSON.parse(payload.toString())
  54. } catch (e) {
  55. parsedData = payload.toString()
  56. }
  57. message.value = { topic, payload: parsedData }
  58. if (customMessageCallback) {
  59. customMessageCallback(topic, parsedData)
  60. }
  61. })
  62. } catch (err) {
  63. ElMessage.error(`MQTT 初始化异常: ${(err as Error).message}`)
  64. isConnected.value = false
  65. }
  66. }
  67. // 订阅主题 (支持泛型)
  68. const subscribe = (topic: string) => {
  69. if (client.value && client.value.connected) {
  70. client.value.subscribe(topic, { qos: 0 }, (err) => {
  71. if (err) ElMessage.error(`订阅失败: ${err.message}`)
  72. else console.log(`已订阅: ${topic}`)
  73. })
  74. }
  75. }
  76. // 发布消息
  77. const publish = (topic: string, message: string | object) => {
  78. if (client.value && client.value.connected) {
  79. const payload = typeof message === 'string' ? message : JSON.stringify(message)
  80. client.value.publish(topic, payload)
  81. }
  82. }
  83. // 销毁连接
  84. const destroy = () => {
  85. if (client.value) {
  86. client.value.end()
  87. client.value = null
  88. isConnected.value = false
  89. }
  90. }
  91. // 组件卸载时自动断开 (防止内存泄漏)
  92. onUnmounted(() => {
  93. destroy()
  94. })
  95. return {
  96. client,
  97. isConnected,
  98. message, // 组件内可以直接 watch 这个变量
  99. connect,
  100. subscribe,
  101. publish,
  102. destroy
  103. }
  104. }