Stateful Session Management

Hello,

in my project I want to have a stateful service, that would receive a stream of commands from a Client. For each Client, the service holds an integer value. By using the request stream the client should be able to control the integer value (lets say to add or distract some integer values) and then the service would emit the up-to-date value back to the user.

I had a look at the code presented at: How to create Stateful Functions using RSocket RPC service?

There the session of the client is not managed. How do I manage the session. Is there a better solution than having a map of session IDs to manage the clients?

Hi,

Right now it would depend how you routed your requests. If you used “group” routing it will load balance across servers so there wouldn’t be a way to keep state. If you tag based routing you could route to servers based on a tag and then use that to keep state. We are going to be adding more support for sessions in the future.

We can create a demo that shows how to do this with the current release.

Thanks,
Robert

Hey,

a demo of tag routing would be really amazing. And another question. Like if in my architecture I have several service instances, connected to proteus. Now a client would establish a stream channel with the proteus. Would the proteus then establish stram connection to a single service instance? Or would it instead load-balance the client stream between the service instances?

Thank you so much for your help :slight_smile:

Hi,

Since there was multiple people interested in this I created a more sophisticated example than I had original planned. Some of the functionality will be folded into the broker / broker client to make this transparent in the future.

I created a class called StatefulSocket. This class creates a session service using the DefaultSessionService service. The StatefulSocket creates a soft sticky connection to a particular service that is valid as long as there is an active RSocket connection. It uses an API available in the Netifi Broker that will notify you when a destination leaves. If either the client or service are discounted, or the StatefulSocket is closed the session is closed. Each StatefulSession has a sessionId that is unique, and be used to tie items in the session. The StatefulSocket acts like a normal RSocket and you can pass it to RSocket-RPC client etc. All requests from the StateSocket will go to the destination that it’s tied with for the socket’s lifetime.

To get the demo to work right now you need to build the netifi-java develop branch locally. Here’s the branch:

We’ve been moving some things around with our builds - I will send information to a repository with a snapshot when it’s available. Here’s a link to the to the example:

Let me know if you have any questions.

Thanks,
Robert

1 Like

hi @robertroeser I hope you are doing well.
I tried today to test the stateful session but unfortunately I wasn’t capable to build the netifi-java because I need to have your netifyReadOnlyUsername and password.
Is it possible for you to share this information?
Another question is about the pure rsocket when do you think the stateful session will be available ?
Thanks a lot for your help.
Best regards,
Badr

Hi @belkirdi,

We have release planned for Monday (tomorrow) that will have the changes needed make the sample code work.

Thanks,
Robert

Awesome thanks for this update. What about the pure rsocket any plan soon ?
Thanks a lot for your help
Badr

Hi-

Release 1.6.3 is finished and I pushed the changes to the GitHub repository listed above in the develop branch. You should be able to run the demo now.

Thanks,
Robert

Hey Robert, so the link above does not work. Anyway I’ve found the netifi-java repo. Cloned the devel branch and tried to build, yet I am also getting the netifyReadOnlyUsername error. Am I doing something wrong?

The link doesn’t work (https://github.com/netifi/netifi-stateful-socket) or you’re getting an error? What error are you getting because everything should be in public now.

@oe19fyfa - I had mavenLocal there so it might have only been working on my laptop. I removed that and pushed some updates.

@robertroeser Hey, thank you once again for your replies. So I was playing around with the example and at this point I have some questions.

As I could see, both the client and the service have DESTINATION_TAG hardcoded to com.netifi.destination. As far as I understand this is what you meant under the tagged routing. The client’s request will be redirected to a service with a specific tag.

So I tried chaning the DESTINATION_TAG of the service in com.netifi.quickstart.service.DefaultSessionService.java on line 24. I expected that the service will not get the clients requests as the tags do not match anymore. However, the example still works just fine.

Then i tried changing the client’s DESTINATION_TAG in com.netifi.quickstart.client.StatefulSocket.java to match the new DESTINATION_TAG of the service. When I ran this, the client could not find the service and timed out.

So my question is, what do DESTINATION_TAG on client and on server side stand for? Where can i see the code related to tagged routing?

Thanks :slight_smile:

Hi-

Under the covers in the broker there are two ways to things are routed. First everything has a group - this is done for security purposes. The second thing is tags. When you connect you supply a group, and tags. Some tags are automatically added - like the destination tag with the key com.netifi.destination A destination is a unique instance of a service - regardless the number of connections available. Unless you specify something the destination id is generated. The way the stateful session works is the server sends the client back its destination - ie the destination that points to the instance of the server where session state resides. The client then uses this destination to route subsequent requests to the server. If that is changed then it won’t route back to same place.

The routing code isn’t open sourced yet, but essential it uses intervened bitmap indexing to allow fast routing. When you connect for each tag you connect with it creates an index. So if you connected with the tags [a:1,b:1,c:1] you could route on any combination of those tags. If you have a tag that is not present in the list it will dis-qualify the destination - ie you want to route on [a:1,d:1] - that wouldn’t match anything because you didn’t supply the tag [d:1]

Some more notes about Tag Based Routing:

Tag Based Routing

Netifi release 1.6.2 brings the first round of support for tag-based routing. In 1.6.2 when a client connects to the Netifi Broker it can now add arbitrary tags to identify a destination. A tag consists of key and value. These can be any UTF-8 string. You can add multiple tags to the destination. Tags are associated with a destination at connection time. Tags associated with a connection are currently static. A destination connected with the tag version:123 will be associated with the destination for the lifetime of it’s connection. If you need to change a tag currently you re-connect.

Routing now uses tag as wells. Using the Netifi Broker client you can create virtual routes based on tags. Tags are only matched on the tags you send – the other tags associated with connection are ignored for the purposes of matching the route. Tags are matched where they intersect. For instance, if you create a virtual route with 3 tags, a destination must have all 3 tags to be considered.

Netifi Tagging Support in Spring

This section covers Spring support for tagging. The BrokerClientFactory will lookup implementations of the RSocket RPC clients using methods similar to the BrokerClient object. This interface will be an analog for the @ BrokerClient annotaion. The BrokerClientFactory should be in the package com.netifi.spring.core , and look like the following:

package com.netifi.spring.core;

...

interface BrokerClientFactory {

enum Type { DESTINATION, GROUP, BROADCAST }

T lookup(@BrokerClient.Type type, String group, Tags tag);

default T lookup(@BrokerClient.Type type, String group, String... tags);

default T lookup(@BrokerClient.Type type);

default T lookup(@BrokerClient.Type type, Tags tag);

}

There should be interface analogs for the other BrokerClient alias annotations - @Group , @Broadcast , and @Destination . Here is pseudo-code for them:

interface GroupAwareClientFactory extends BrokerClientFactory {

default T group() {}

default T group(String group, Tag... tag) {}

default T group(String group, Tags tags) {}

default T group(Tag... tag) {}

default T group(Tags tags) {}

}

interface DestinationAwareClientFactory extends BrokerClientFactory{

default T destination() {}

default T destination(String destination) {}

default T destination(String destination, String group) {}

}

interface BroadcastAwareClientFactory extends BrokerClientFactory{

default T broadcast() {}

default T broadcast(String group, Tag... tag) {}

default T broadcast(String group, Tags tags) {}

default T broadcast(Tag... tag) {}

default T broadcast(Tags tags) {}

}

The @BrokerClient annotation, and it’s aliases’ support annotating RSocket RPC client and BrokerClientFactory variables. The route created will be static and cannot change at run-time. When @BrokerClient is added to a BrokerClientFactory variable then it will inject a BrokerClientFactory . This will let the caller dynamically change the route based on the method called. The lookup method without variables uses the defaults provided in the @BrokerClient annotation. The lookup methods with variables overrides the annotation defaults. Alias annotations and interfaces work the same way. For instance, GroupAwareClientFactory.group(String group, Tags tags) overrides by the group and tags, but DestinationAwareClientFactory.group(Tags tags) overrides tags.

The same information for @Group , @Broadcast , and @Destination is required.

Annotations only work with their analog interface, or an RSocket RPC client. For example. @Group annotation work with GroupAwareClientFactory or Service directly. Conversely , a @BrokerClient annotation will not work with GroupAwareClientFactory instance and creates an error. *AwareClientFactories will not let you override the routing type. To override the routing type you must use the @BrokerClient annotation directly.

Here examples uses:

Example usage:

Example 1:

// Existing - route to mygroup

@Group(group="mygroup")

MyService service;

// With tags - route to mygroup using the region tag

@Group(group="mygroup", tags = Tags.of("region", "us-west"))

MyService service;

Example 2:

@Group(group="mygroup")

GroupAwareClientFactory<MyService> serviceFactory;

// Route to mygroup

MyService service = serviceFactory.group();

// Route to mygroup using region tag

MyService service = serviceFactory.group("another-group", Tags.of("region", "us-west"));

Example 3:

@Group(group="mygroup", tags = Tags.of("region", "us-west"))

GroupAwareClientFactory<MyService> service;

// Route to mygroup using region tag

MyService service = GroupAwareClientFactory.group();

// Route to mygroup with no tags

MyService service = GroupAwareClientFactory.group("another-group", Tags.empty());

// Route to mygroup with region tag, and add phone tag

MyService service = GroupAwareClientFactory.groupAddTag(Tags.of("phone", "iOS");

// Route to mygroup using region tag, but with another value us-east

MyService service = GroupAwareClientFactory.group("another-group", Tags.of("region", "us-east"));

Example 4:

@BrokerClient(group="mygroup")

BrokerClientFactory<MyService> service;

// Route to group mygroup using broadcast

MyService service = service.lookup(BROADCAST);

// Route to group mygroup, destination 123

MyService service = BrokerClientFactory.lookup(DESTINATION, "mygroup", "123");

// Route to group another-group, destination 123

MyService service = BrokerClientFactory.lookup(GROUP, "another-group");

One can add tags to an auto-generated service clients. There are three methods to add tags

  1. String literal arguments to the Annotation

@Group(group = "mygroup", tags = Tags.of("region", "us-west")))

GroupAwareClientFactory<MyService> serviceFactory;

  1. A TagSupplier implementation

interface TagSupplier extends Supplier<Tags> {

Tags get();

}

class MyDefaultTagSupplier implements TagSupplier {

Map<String, String> tags;

public MyDefaultTagSupplier(Map<String, String> config) {

this.tags = Tags.EMPTY;

for (Map.Entry<String, String> entry : config.entrySet()) {

tags = tags.and(Tags.of(entry.getKey(), entry.getValue()));

}

}

Tags get() {

return tags;

}

}

@Group(group = "mygroup",tagSupplier = MyDefaultTagSupplier.class))

GroupAwareClientFactory<MyService> serviceFactory;

  1. Via methods on the client factory.

MyService service = serviceFactory.group(Tags.of("region", "us-east"));

In case there are conflicts in tag names, tags added by the ClientFactory method take precedence, followed by tags provided by the TagSupplier, with literals set in the Annotation arguments taking lowest precedence.

Here is a more complete example:

interface TagSupplier extends Supplier<Tags> {

Tags get();

}

class MyDefaultTagSupplier implements TagSupplier {

Map<String, String> tags;

public MyDefaultTagSupplier(Map<String, String> config) {

this.tags = Tags.EMPTY;

for (Map.Entry<String, String> entry : config.entrySet()) {

tags = tags.and(Tags.of(entry.getKey(), entry.getValue()));

}

}

Tags get() {

return tags;

}

}

@Group(group = "mygroup", tagSupplier = MyDefaultTagSupplier.class, tags = Tags.of("region", "us-west")))

GroupAwareClientFactory<MyService> serviceFactory;

//Uses group of "mygroup" and tags of region=us-west plus all tags from MyDefaultTagSupplier's config. If MyDefaultTagSupllier has a "region" tag, that will supercede "region=us-west"

MyService defaultService = serviceFactory.group()

//Uses group of "different-group" and tags of region=us-west plus all tags from MyDefaultTagSupplier's config. If MyDefaultTagSupllier has a "region" tag, that will supercede "region=us-west"

MyService serviceOne = serviceFactory.group("different-group")

//Uses group of "different-group" and tags of region=us-west plus all tags from MyDefaultTagSupplier's config plus clientVersion=1.2.3

MyService serviceTwo = serviceFactory.groupAddTag("another-group", Tags.of("clientVersion", "1.2.3"))

//Uses group of "different-group" and tags of region=us-east plus all tags from MyDefaultTagSupplier's config

MyService serviceTwo = serviceFactory.groupAddTag("yet-another-group", Tags.of("region", "us-east"))

Thanks,
Robert