Halving throughput


#1

Hello Guys,
Im currently benchmark my application which uses RSocket. I noticed that in Single Thread mode, i’ve got a thoughput of ca. 50k Requests per Second. On 2 Thread also each 50k/s. But on 4 Thread it’s 25k/s and on 8 Thread it’s about 12k/s. So its halving by doubling number of Threads.

I’m using a Channel Interaction Model with TCP.

The Server: Just Response the Requester with the Request Payload

@Override
public Flux<Payload> requestChannel(final Publisher<Payload> payloads) {
	return Flux.from(payloads)
			.subscribeOn(server)
			.publishOn(server)
			.share();
}

The Client: 
client.requestChannel(
			Flux.fromIterable(list
					.delayElements(carConfiguration.DELAY)
					.subscribeOn(scheduler)
					.map(coordinate -> DefaultPayload.create(Serializer.serialize(coordinate)))
					.publishOn(scheduler)
					.share()).subscribe()

I’ve runned every concurrent Client on serperate Thread in Threadpool of number Processorcores.
Do you have any idea why this is happening?


#2

A couple reasons. The Java RSocket implementation is non-blocking using event-loops. Unless you’re are doing something that could block an event loop - like network call using blocking library - you don’t need to tell reactor where to schedule a thread. Adding threads will not help performance as you found out. Hopping between threads causes context switching and CPU cache misses. You want to stay on the same thread as much as possible. RSocket and reactor-core while being non-blocking do whatever they can to make your code run one thread. This is much more performant. They allow you do go to different threads, but if you don’t need to there are optimizations if you don’t tell it where to schedule work.

@Override
public Flux<Payload> requestChannel(final Publisher<Payload> payloads) {
	return Flux.from(payloads)
			.subscribeOn(server)
			.publishOn(server)
			.share();
}

It looks like you’re trying to share publisher between channels. You can’t really share the same stream to all callers this way using RSocket - each payloads stream is unique to the requestChannel. You don’t need to call subscribeOn, publishOn, or share.

If not you could just re-write it like this :
client.requestChannel(Flux
.fromIterable(list)
.delayElements(carConfiguration.DELAY)
.map(coordinate -> DefaultPayload.create(Serializer.serialize(coordinate)))
).subscribe;

If you would like a single publisher that is shared by multiple threads let me know and we can get you an example.

Regarding the client:
client.requestChannel(
Flux.fromIterable(list
.delayElements(carConfiguration.DELAY)
.subscribeOn(scheduler)
.map(coordinate -> DefaultPayload.create(Serializer.serialize(coordinate)))
.publishOn(scheduler)
.share()).subscribe()

You don’t need the publishOn or the subscribeOn there. The request will be sent over the network in a non-blocking manner and scheduled back on an event loop - so there is no need to schedule this on another thread. Since RSocket TCP binds a connection to a core, to test scaling you need more connections. You could do something like this:

Publisher request = client.requestChannel(Flux
    .fromIterable(list)
    .delayElements(carConfiguration.DELAY)
    .map(coordinate -> DefaultPayload.create(Serializer.serialize(coordinate))));

int numRequests = 4; // the number of requests you want - ie 5 for 5, 10 for 10 etc
Flux.range(1, numRequests).flatMap(i -> request).subscribe();

Under the hood each new request will select the a new event loop and spread your load out.

Final thought - If you’re testing this on the same machine depending on how you have things setup your RSocket event loops with start fighting for the same cores leading to context switching and what not.

Let know if this clears it up or you have other questions.

Thanks,
Robert


#3

Thank you Robert for the reply. I’ve applied you’r recommedations.

It make sense now. There is no performance gain at all, because I’m on a local machine.

I have another Question:
I do technology comparison between java.net Socket and RSocket. I compare the throughput of both but I noticed some strange behaviour:
On simple Java Socket Server i sleep 10 ms to simulate expensive processing time. When i do the same sleep event on the Serverside of RSocket, it slows down really bad. Without sleep time the RSocket Application has 5 times more throughput per second as the Java Socket Application. With sleep time both Applications have the same throughput.

That’s the Future which receive the client payload, sleep 10ms and respond with some dummy payload.

CompletableFuture.runAsync(() -> {
				ObjectInputStream ois;
				ObjectOutputStream oos;
				while(true) {
					try {
						assert finalClientSocket != null;
						ois = new ObjectInputStream(new BufferedInputStream(finalClientSocket.getInputStream()));
						Coordinate coordinate = (Coordinate) ois.readObject();

						//the expensive "work"
                        Thread.sleep(10);

						oos = new ObjectOutputStream(new BufferedOutputStream(finalClientSocket.getOutputStream()));
						oos.writeObject(coordinate);
						oos.flush();
					} catch (IOException | ClassNotFoundException | InterruptedException e) {
						System.err.println("[Server] Connection to Client was dropped " + e.getMessage());
						break;
					}

				}
			}, executorService);

On the other hand, the requestChannel Method from the acceptor class:

@Override
public Flux<Payload> requestChannel(final Publisher<Payload> payloads) {
    return Flux.from(payloads)
            .delayElements(Duration.ofMillis(10)); //the expensive "work"
}

My measuring results in following:

RSocket

  • without sleep: 45848 Throughput/Second
  • with sleep: 92 Throughput/Second

Java Socket

  • without sleep: 13231 Throughput/Second
  • with sleep: 92 Throughput/Second

I think that the delayElements Method decreases the receving as well as the emitting. It should instead decrease the receiving time because of the sleep. :confused:

Do You have any idea how I can acomplish that?


#4

I think the delay method you want is delaySequence. The docs on delayElements indicate that it introduces a minimum delay of 10ms between emissions which hard-caps your throughput at 100 (1s / 0.01s) (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration-).


#5

Hi,

I’ve created an example here:

When you run it you will see messages on the client like:
got a message [527062 - sending message 1550013183903] at 1550013183914

This message has two milliseconds - the first 1550013183903 and the second 1550013183914 - so it is delaying the request approximately 10 milliseconds.

Thanks,
Robert