Java/Spring @Destination client annotation causing an error

I am extending https://github.com/netifi/netifi-quickstart-spring code. I’d like to send a message from one client to another. When I’ve added @Destination(group = "quickstart.clients", destination = "client1") in my new client2 code then I have an error:

2019-07-05 15:05:06.688 ERROR 21639 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute CommandLineRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:816)
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
	at com.netifi.quickstart.client.Main.main(Main.java:9)
Caused by: io.rsocket.exceptions.ApplicationErrorException: caught exception: java.lang.NullPointerException
message: The mapper returned a null value.
broker id: f775368b-efaa-403c-bc75-a53e95fb6d5a
peer broker id: null
source destination: client2
source group: quickstart.clients
source tags: [tag(com.netifi.destination=client2)]
target destination: null
target group: quickstart.clients
query tags: [tag(destination=client)]

	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45)
	at io.rsocket.RSocketClient.handleFrame(RSocketClient.java:502)
	at io.rsocket.RSocketClient.handleIncomingFrames(RSocketClient.java:463)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
	at reactor.core.publisher.Flux.subscribe(Flux.java:7799)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1510)
	at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
	at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:106)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:322)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:335)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:432)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:333)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1494)
		at com.netifi.quickstart.client.Client2Runner.run(Client2Runner.java:40)
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813)
		at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797)
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
		at com.netifi.quickstart.client.Main.main(Main.java:9)

When I change it to @Group("quickstart.clients") it works - provided broker choose client1 from the group.

I’ve noticed that in netifi-spring-core-1.6.4-sources.jar!/com/netifi/spring/core/annotation/BrokerClientStaticFactory.java:311 there is a tag created:

 case DESTINATION:
        brokerSocket =
            brokerClient.groupServiceSocket(group, tags.and(Tags.of("destination", destination)));
        break;

but Netify broker uses a different key for a tag:
adding destination e9a87da1-f477-35a2-8902-35bd59e0018e with tags [tag(com.netifi.destination=client2)] to group quickstart.clients.

Instead of destination - com.netifi.destination. I suppose that is why I’ve got target destination: null

Am I doing sth wrong or this is a bug?

Note:
In destination-based routing guide and com.netifi.broker.BrokerClient.java "com.netifi.destination" key is used.

I believe there was a bug with this that got fixed. What version are you using?

The direct dependency is: com.netifi:netifi-spring-boot-starter:1.6.4.

And the possible bug found at com.netifi:netifi-spring-core:1.6.4:

Any update on this one?

I see that v. 1.6.7 still has a typo/bug:

We’ll someone take a look at this a look at this issue.

@szisiu, sorry for the long waiting time. The fix was pending in a different PR, so we ported it into 1.6.x. It will be available with 1.6.8 release.

Thanks for the patience.
Oleh

Great! Thank you, Guys!