diff --git a/packages/microservices/server/server-mqtt.ts b/packages/microservices/server/server-mqtt.ts index 99e4d2b94..37950bd75 100644 --- a/packages/microservices/server/server-mqtt.ts +++ b/packages/microservices/server/server-mqtt.ts @@ -90,10 +90,21 @@ export class ServerMqtt extends Server { const registeredPatterns = [...this.messageHandlers.keys()]; registeredPatterns.forEach(pattern => { - const { isEventHandler } = this.messageHandlers.get(pattern)!; + const handler = this.messageHandlers.get(pattern)!; + const { isEventHandler, extras } = handler; + + const globalSubscribeOptions = this.getOptionsProp( + this.options, + 'subscribeOptions', + ); + const subscribeOptions = + extras?.qos !== undefined + ? { ...globalSubscribeOptions, qos: extras.qos } + : globalSubscribeOptions; + mqttClient.subscribe( isEventHandler ? pattern : this.getRequestPattern(pattern), - this.getOptionsProp(this.options, 'subscribeOptions'), + subscribeOptions, ); }); } diff --git a/packages/microservices/test/server/server-mqtt.spec.ts b/packages/microservices/test/server/server-mqtt.spec.ts index 9d0ea357f..036c0d3f8 100644 --- a/packages/microservices/test/server/server-mqtt.spec.ts +++ b/packages/microservices/test/server/server-mqtt.spec.ts @@ -93,6 +93,98 @@ describe('ServerMqtt', () => { server.bindEvents(mqttClient); expect(subscribeSpy.calledWith(pattern)).to.be.true; }); + + describe('per-handler QoS via extras.qos', () => { + it('should use extras.qos=2 when handler specifies qos 2', () => { + const pattern = 'alerts/critical'; + const handler = Object.assign(sinon.spy(), { extras: { qos: 2 } }); + untypedServer.messageHandlers = objectToMap({ + [pattern]: handler, + }); + server.bindEvents(mqttClient); + expect(subscribeSpy.calledOnce).to.be.true; + expect(subscribeSpy.firstCall.args[0]).to.equal(pattern); + expect(subscribeSpy.firstCall.args[1]).to.deep.equal({ qos: 2 }); + }); + + it('should use extras.qos=0 when handler specifies qos 0', () => { + const pattern = 'telemetry/data'; + const handler = Object.assign(sinon.spy(), { extras: { qos: 0 } }); + untypedServer.messageHandlers = objectToMap({ + [pattern]: handler, + }); + server.bindEvents(mqttClient); + expect(subscribeSpy.calledOnce).to.be.true; + expect(subscribeSpy.firstCall.args[0]).to.equal(pattern); + expect(subscribeSpy.firstCall.args[1]).to.deep.equal({ qos: 0 }); + }); + + it('should use global subscribeOptions when extras.qos is undefined', () => { + const globalQos = 1; + const serverWithOptions = new ServerMqtt({ + subscribeOptions: { qos: globalQos }, + }); + const untypedServerWithOptions = serverWithOptions as any; + const pattern = 'events/general'; + const handler = Object.assign(sinon.spy(), { extras: {} }); + untypedServerWithOptions.messageHandlers = objectToMap({ + [pattern]: handler, + }); + serverWithOptions.bindEvents(mqttClient); + expect(subscribeSpy.calledOnce).to.be.true; + expect(subscribeSpy.firstCall.args[0]).to.equal(pattern); + expect(subscribeSpy.firstCall.args[1]).to.deep.equal({ + qos: globalQos, + }); + }); + + it('should override only qos while preserving other global subscribeOptions', () => { + const serverWithOptions = new ServerMqtt({ + subscribeOptions: { qos: 1, nl: true, rap: false }, + }); + const untypedServerWithOptions = serverWithOptions as any; + const pattern = 'commands/run'; + const handler = Object.assign(sinon.spy(), { extras: { qos: 2 } }); + untypedServerWithOptions.messageHandlers = objectToMap({ + [pattern]: handler, + }); + serverWithOptions.bindEvents(mqttClient); + expect(subscribeSpy.calledOnce).to.be.true; + expect(subscribeSpy.firstCall.args[1]).to.deep.equal({ + qos: 2, + nl: true, + rap: false, + }); + }); + + it('should apply different qos per handler when multiple handlers exist', () => { + const serverWithOptions = new ServerMqtt({ + subscribeOptions: { qos: 1 }, + }); + const untypedServerWithOptions = serverWithOptions as any; + + const handler0 = Object.assign(sinon.spy(), { extras: { qos: 0 } }); + const handler1 = Object.assign(sinon.spy(), { extras: {} }); + const handler2 = Object.assign(sinon.spy(), { extras: { qos: 2 } }); + + untypedServerWithOptions.messageHandlers = objectToMap({ + 'telemetry/+': handler0, + 'events/#': handler1, + 'alerts/critical': handler2, + }); + + serverWithOptions.bindEvents(mqttClient); + + expect(subscribeSpy.callCount).to.equal(3); + + const calls = subscribeSpy.getCalls(); + const callMap = new Map(calls.map(c => [c.args[0], c.args[1]])); + + expect(callMap.get('telemetry/+')).to.deep.equal({ qos: 0 }); + expect(callMap.get('events/#')).to.deep.equal({ qos: 1 }); + expect(callMap.get('alerts/critical')).to.deep.equal({ qos: 2 }); + }); + }); }); describe('getMessageHandler', () => { it(`should return function`, () => {