AWS Lambda + Proteus POC - Service Side Connectivity Question


#1

I’m currently playing around with a POC where I am invoking an AWS Lambda with the Java client making a request out to a SpringBoot service via Proteus. It works quite well except that I’m seeing that after a while, the Spring Boot app suddenly “drops” from sight which causes my Lambda function to timeout. In this state, I can see the Lambda client connects to Proteus, it just looks like there isn’t a service to fulfill the request (my SB app). I also can’t see the SB app in the Monitoring web page and I also don’t see any exceptions, reconnect messages or anything on the SB app side like I do if I was to shut down Proteus.

Here’s my sample protobuf service idl:

service MyService {

    rpc createWidget( CreateRequest ) returns ( CreateResponse ) {}

    rpc notifyWidgetPairing( PairingNotification ) returns ( Empty ) {}

}

If I restart my Spring Boot app, everything is good again for a little while. Is there something I’m missing on the Spring Boot app side? Maybe my broker configuration? (I currently have defaults) Perhaps, I should be using a stream instead of monos? All I want to do is keep that SB service “alive” and accepting requests from my Lambda clients. I’m thinking it’s something simple I’m overlooking. Thanks in advance!

Note: My setup is a simple single EC2 Docker Swarm box where my service and Proteus are running in.


#2

Hi,

Looking into reproducing this. How are you connecting to the broker - IP address, load balancers, etc? Can you past the configuration you’re using to start the broker?

Thanks,
Robert


#3

Thanks Robert. Yes, here’s what I’m using:

BROKER_SERVER_OPTS  = '-Dnetifi.broker.tcp.publicAddress=172.31.1.69' '-Dnetifi.authentication.0.accessKey=9007199254740991'  '-Dnetifi.broker.console.enabled=true' '-Dnetifi.authentication.0.accessToken=kTBDVtfRBO4tHOnZzSyY5ym2kfY=' '-Dnetifi.broker.admin.accessKey=9007199254740991' '-Dnetifi.broker.admin.accessToken=kTBDVtfRBO4tHOnZzSyY5ym2kfY='

Now, I first tried it without the “publicAddress”. My “publicAddress” is really my EC2’s internal IP as I don’t have this POC exposed publicly but I figured having that value be the same as the value my Lambda function is using might help. I also validated my Security Groups were good to go from Lambda to the EC2 instance side.

Essentially, this is what I see:

Lambda side:

[reactor-tcp-nio-4] INFO io.netifi.proteus.DefaultProteusBrokerService - selected socket WeightedClientTransportSupplier{errorPercentage=Ewma(value=1.0, age=1548472244430), socketAddress=172.31.1.69:8001, selectCount=1}
[reactor-tcp-nio-4] ERROR io.netifi.proteus.rsocket.WeightedReconnectingRSocket - proteus client received unhandled exception for connection with address 172.31.1.69:8001
io.rsocket.rpc.exception.ServiceNotFound: can not find service com.rest.proto.DeviceService
at io.rsocket.rpc.rsocket.RequestHandlingRSocket.requestResponse(RequestHandlingRSocket.java:63)

Broker Side

2019-01-26 04:29:55,821 INFO i.n.p.b.a.BrokerSocketAcceptor [reactor-tcp-server-epoll-6] new destination connection from access key 9007199254740991, destination 4e1ebdc9-4b3b-4b75-a2fc-4a82c90a9a18-2 and group rest.deviceService,
2019-01-26 04:29:55,832 INFO i.n.p.b.a.BrokerSocketAcceptor [reactor-tcp-server-epoll-6] adding destination 4e1ebdc9-4b3b-4b75-a2fc-4a82c90a9a18-2 to group rest.deviceService,
2019-01-26 04:29:55,833 INFO i.n.p.b.a.ClusterSocketAcceptor [reactor-tcp-server-epoll-6] adding destination 4e1ebdc9-4b3b-4b75-a2fc-4a82c90a9a18-2 to group rest.deviceService,
2019-01-26 04:30:08,780 ERROR i.n.p.b.Broker [reactor-tcp-server-epoll-8] Server: unhandled exception,
io.rsocket.exceptions.ApplicationErrorException: can not find service com.rest.proto.DeviceService,
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:53) ~[rsocket-core-0.11.15.jar:?],
	at io.rsocket.RSocketClient.handleFrame(RSocketClient.java:464) ~[rsocket-core-0.11.15.jar:?],
	at io.rsocket.RSocketClient.handleIncomingFrames(RSocketClient.java:427) ~[rsocket-core-0.11.15.jar:?],
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.Flux.subscribe(Flux.java:7727) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:94) ~[rsocket-core-0.11.15.jar:?],
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:327) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:310) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1429) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1211) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1245) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:808) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final],
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:410) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final],
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:310) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final],
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.29.Final.jar:4.1.29.Final],
	at java.lang.Thread.run(Thread.java:834) [?:?],
2019-01-26 04:30:22,743 ERROR i.n.p.b.Broker [reactor-tcp-server-epoll-6] Server: unhandled exception,
io.rsocket.exceptions.ApplicationErrorException: can not find service com.rest.proto.DeviceService,
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:53) ~[rsocket-core-0.11.15.jar:?],
	at io.rsocket.RSocketClient.handleFrame(RSocketClient.java:464) ~[rsocket-core-0.11.15.jar:?],
	at io.rsocket.RSocketClient.handleIncomingFrames(RSocketClient.java:427) ~[rsocket-core-0.11.15.jar:?],
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE],
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:327) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:310) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1429) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1211) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1245) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final],
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:808) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final],
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:410) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final],
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:310) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final],
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.29.Final.jar:4.1.29.Final],
	at java.lang.Thread.run(Thread.java:834) [?:?],
disposing...,
2019-01-26 04:31:22,239 INFO i.n.p.b.Broker [Thread-0] SHUTDOWN HOOK - Broker not disposed - shutting down,
disposed...,

And as I mentioned, on the Spring boot side, I don’t see any exceptions.


#4

Thanks for the information - this should make it easier to figure out what’s happening. Interestingly enough this error is coming from the Spring Boot app. Something is happening where RSocket receives the request but can’t route it to handler. That’s probably why it works after you reboot. Let you know what we find.


#5

I edited my original response. Yes, I also suspect is is a Spring Boot integration issue.


#6

One more thing - we’re working on cleaning up our documentation, and maybe this is what you met- but you can think of the public address as the address that a broker will advertise itself as. When you make a cluster it will send the public address to the clients telling them where to connect. If you are in a docker container you could bind on address 1.2.3.4, but want the broker to tell clients to connect to the address of the host 10.0.0.1
It doesn’t necessary mean an IP address you share with the public. Its more like the publicly advertised address.


#7

I figured the publicAddress didn’t have to be a public address so thanks for that confirmation. I’m going to try your suggestion.

But in my case, the Lambda client already knows the broker’s IP, so what it gets back as an advertisement from the broker is ignored? I’m just guessing this from the fact that it was still able to communicates when my publicAddress was 127.0.0.1.


#8

Yes - if you have one broker it won’t try to connect to anything else most the time. If the connection breaks it will try to connect to the broker with the least connections from its point of view. It probably isn’t that big of an issue with a lamba depending on how long it lives. But It would be better esp in a cluster to set the public address to something that someone could bind too so that it could retry to different brokers if the current one failed, or it need to make more than one request. The java client tries to connect to multiple brokers for performance and reliability reasons


#9

Cool. I like it. Thanks.


#10

I tried an experiment today:

  1. Had my SB app running on the Docker Swarm connected to the broker.
  2. In my IDE, I set logging to DEBUG on packages io.netifi and io.rsocket and ran the same app running (VPN setup to my VPC)
  3. Invoked the Lambda a few times and I saw both the local app and the docker app handle the requests. (Sort of a round robin type of thing).
  4. I then turned off the Swarm App and just left the local app running.
  5. Made a few more calls until I got the “no service” error.

Here are the logs for the broker when it came up:


2019-01-29 02:07:05,169 INFO i.n.p.b.a.BrokerSocketAcceptor [reactor-tcp-server-epoll-7] new destination connection from access key 9007199254740991, destination pairRequestLambdaClient-0 and group rest.deviceService


2019-01-29 02:07:05,180 INFO i.n.p.b.a.BrokerSocketAcceptor [reactor-tcp-server-epoll-7] adding destination pairRequestLambdaClient-0 to group rest.deviceService


2019-01-29 02:07:05,180 INFO i.n.p.b.a.ClusterSocketAcceptor [reactor-tcp-server-epoll-7] adding destination pairRequestLambdaClient-0 to group rest.deviceService


2019-01-29 02:07:05,180 INFO i.n.p.b.a.BrokerSocketAcceptor [reactor-tcp-server-epoll-8] removing destination pairRequestLambdaClient-0 from group rest.deviceService


2019-01-29 02:07:05,180 INFO i.n.p.b.a.ClusterSocketAcceptor [reactor-tcp-server-epoll-8] removing destination pairRequestLambdaClient-0 to group rest.deviceService


2019-01-29 02:07:05,181 INFO i.n.p.b.a.BrokerSocketAcceptor [reactor-tcp-server-epoll-8] removing destination pairRequestLambdaClient-0 from destination map


2019-01-29 02:07:05,181 INFO i.n.p.b.a.BrokerSocketAcceptor [reactor-tcp-server-epoll-8] removing destination pairRequestLambdaClient-0 from group rest.deviceService load balancer


2019-01-29 02:07:05,450 ERROR i.n.p.b.Broker [reactor-tcp-server-epoll-5] Server: unhandled exception


io.rsocket.exceptions.ApplicationErrorException: can not find service com.rest.proto.DeviceService


	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:53) ~[rsocket-core-0.11.15.jar:?]


	at io.rsocket.RSocketClient.handleFrame(RSocketClient.java:464) ~[rsocket-core-0.11.15.jar:?]


	at io.rsocket.RSocketClient.handleIncomingFrames(RSocketClient.java:427) ~[rsocket-core-0.11.15.jar:?]


	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.Flux.subscribe(Flux.java:7727) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:94) ~[rsocket-core-0.11.15.jar:?]


	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]


	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE]


	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:327) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE]


	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:310) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE]


	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) ~[reactor-netty-0.8.1.RELEASE.jar:0.8.1.RELEASE]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1429) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1211) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1245) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]


	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:808) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final]


	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:410) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final]


	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:310) ~[netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final]


	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.29.Final.jar:4.1.29.Final]


	at java.lang.Thread.run(Thread.java:834) [?:?]


2019-01-29 02:07:15,492 ERROR r.n.t.TcpServer [reactor-tcp-server-epoll-8] [id: 0x589fd336, L:/10.255.0.140:8001 - R:/10.255.0.2:56564] onUncaughtException(SimpleConnection{channel=[id: 0x589fd336, L:/10.255.0.140:8001 - R:/10.255.0.2:56564]})


javax.net.ssl.SSLException: handshake timed out


	at io.netty.handler.ssl.SslHandler.handshake(...)(Unknown Source) ~[netty-handler-4.1.29.Final.jar:4.1.29.Final]

I find the SSLException down at the bottom curious. Either way, I got nothing from the Spring app of use.

Last successful run on the SB app. BTW, this particular test is using the fire-and-forget scenario where I return the Empty.

2019-01-28 21:05:13.919 DEBUG 49720 --- [actor-tcp-nio-5] io.rsocket.FrameLogger                   : receiving -> Frame => Stream ID: 2 Type: REQUEST_RESPONSE Payload: metadata: "pairRequestLambdaClient-1rest.deviceServicerest.deviceServicecom.rest.proto.DeviceServicerequestDevicePairing" data: "
value1" 
2019-01-28 21:05:13.921 DEBUG 49720 --- [actor-tcp-nio-5] com.rest.portal.reactive.DeviceService   : received serial: "value1"

2019-01-28 21:05:13.921 DEBUG 49720 --- [actor-tcp-nio-5] io.rsocket.FrameLogger                   : sending -> Frame => Stream ID: 2 Type: COMPLETE Payload: 

Here to help where I can. Thanks!


#11

Did you get the error only after you turned off your docker swarm application? If it is on do you get the error? Are you using a javascript or java lambda (i assume java?)


#12

Also - SSL is on by default - so if you’re connecting with a client with the SSL you could get an error - although you would be getting it on setup


#13

Using the Java client. I can share the source if you want but it’s pretty much the same as the getting started guide for the pure Java client.


#14

Wouldn’t hurt - I could try running the same thing .


#15

I did get the error after turning off the Swarm app, yes. I actually got one error after that and then it went through to the local app and then, nothing.


#16

Here it is in a gist: https://gist.github.com/hsteidel/7a9876a50d7b5ffa6758559999f46cfc

You’ll see the dependencies I’m using and a condensed .proto.
Also, for my “:device-service-idl” gradle subproject I’m using:

compile “io.netifi.proteus:proteus-client:1.5.3”
compile ‘com.google.protobuf:protobuf-java:3.6.1’

The Spring Boot side is simply just logging that serial field and returning the Empty.

@Slf4j
@Component
public class DeviceService implements com.rest.proto.DeviceService {

    @Override
    public Mono<Empty> requestDevicePairing(DevicePairingRequest message, ByteBuf metadata) {
        log.debug("received " +  message);
        return Mono.empty();
    }
}

#17

There might be a bug here in the netifi.dispose() call:

Can you try not calling that and see what happens?


#18

Yes. I can try it again but I actually added it as a “shot in the dark” thinking that it would help if I gracefully shut it down. :slight_smile: Give me a few I can try it here real quick.


#19

Alright, here we go.

  1. Updated my Lambda code to not dispose()
  2. Restarted my local app
  3. Started invoking
    Run 1. Pass
    Run 2. Pass
    Run 3. Fail (some sort of ClosedChannelException)
    Run 4. Fail . (same as #3)
    Run 5. Fail (service not found error)

This time, i have the full broker logs.

https://drive.google.com/file/d/1VwcJoFiRnV47pme0fFUNHiQ5cG9vrKaR/view?usp=sharing


#20

:frowning: - ok- setting a lambda and what not now so we can debug this