[rsocket-java] is it possible onNext/onComplete is processed out of order?


#1

Hi folks, I’m using rsocket java client in our product, and noticed some strange behavior, which leads me to believe payload may be processed out of order.

my code:

   AtomicBoolean first = new AtomicBoolean(true);
   return conn.requestStream(request))
            .map(
                payload -> {
                  if (first.getAndSet(false)) {
                    try {
                      // do something
                    } finally {
                      payload.release();
                    }
                  }
                  return payload;
                })
            .skip(1)
            .map(
                payload -> {
                  try {
                    // do something with payload
                  } finally {
                    payload.release();
                  }
                })

It processes first payload, then skip it, then process the rest of the payloads using different logic. In very rare cases, I got exception in the second map() complaining that the payload has already been released.

I think it’s possible if onNext is called out of order by netty worker thread. Is that possible in current rsocket java implementation? If this is indeed the problem, then another more severe issue is that if onNext and onComplete is processed out of order, client may lose last few payloads sent by server.

Thanks,
Zhenyuan


#2

Hi,

I created an issue on the rsocket-java github page:

Thanks,
Robert


#3

Thanks. I’m thinking of using just one netty worker thread (there is a reactor envvar for that). Do you think that would mitigate it?

Thanks
Zhenyuan


#4

I’ve been unable to reproduce the issue testing it myself. Do you have more code / context around what you are doing? How the server is implemented would be nice to see as well.


#5

Thanks for looking into this. I was not able to repro that either. It happened very rarely. I suspect there is out of order issue, but wasn’t sure if that’s really the cause.

The server is very simple, just sending a sequence of payloads. There is nothing special about it as far as I can tell.

The client logic has two parts: creating connection and making request:

  TcpClient client =
      TcpClient.create()
          .host(address.getHost())
          .port(address.getPort())
          .option(CONNECT_TIMEOUT_MILLIS, CONN_TIME_OUT);

  Mono<RSocket> conn =
      RSocketFactory.connect()
          .setupPayload(DefaultPayload.create(SETUP_DATA, SETUP_METADATA))
          .frameDecoder(Frame::retain)
          .transport(TcpClientTransport.create(client))
          .start();

Then making the request:

  AtomicBoolean first = new AtomicBoolean(true);
  return conn.flux()
         .flatMap(s -> s.requestStream(request))
        .map(
            payload -> {
              if (first.getAndSet(false)) {
                try {
                  // do something
                  System.out.println(Thread.currentThread().getName())
                } finally {
                  payload.release();
                }
              }
              return payload;
            })
        .skip(1)
        .map(
            payload -> {
              try {
                // do something with payload
              } finally {
                payload.release();
              }
            })

I also dig a little deep into the code:

For every stream request, UnicastProcessor is created

Inside UnicastProcessor, although multiple thread could be inserting items into the queue, only one thread can pull items out of the queue, and call onNext on the subscribers.

I think there are at least two places need to be checked:

  1. Queue implementation. Does the queue strictly enforce FIFO?
  2. How reactor-netty handles onNext to be called for the same connection (whether it strictly follows: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#1.3)

Thanks,
Zhenyuan


#6
  • Queue implementation. Does the queue strictly enforce FIFO?
    Yes the queue strictly enforces order. The WIP is atomic so it can only be updated by one thread at a time - only one thread will get ‘0’ and continue. A common pattern in reactor-core is to use drain loops guarded by atomic WIP variables to guarantee that only thread will drain the queue at a time. This matches the reactive streams rule link you posted.

I still haven’t been able to replicate it still. Have you seen it again?


#7

Not exactly. I understand that the logic around WIP inside UnicastProcessor ensures single threaded execution, but that doesn’t mean payload is delivered in order. Because the queue has its own isolated logic. The queue returned by the default supplier may be FIFO or not in this multi supplier single consumer case. For example, I can think of two possible cases when out of order delivery could happen:

  1. Multiple threads process netty buffer out of order. Then even FIFO won’t help
  2. The default queue implementation is not FIFO in multi supplier case. I tried to read reactor Queues implementation, but wasn’t able to really reason that

I was not able to repro this any more. In fact, I have switched to initializing my own TcpClient with single thread worker pool to make sure both #1 and #2 mentioned above won’t happen.

Thanks
Zhenyuan


#8

Netty isn’t really mutl-threaded. The number of workers is the number of event loops. A single rsocket stream be bound on the event loop/channel that it gets subscribed on. It will only drain on that thread - not multiple threads. The RSocket code and the reactor-netty code check will make sure it only executes drains on that event loop - it’s effectively single threaded.

I read you first commit again and you said you’re receiving an exception on releasing. The more I think about it it’s probably a ByteBuf lifecycle issue. There is only one thread that would release it - the event loop / channel tied to the stream. Netty won’t use multiple threads to drain a channel. Its seems like this is more with retaining/release payloads. The DuplexConnection only allows one subscribe so there isn’t really a way for more than one thread to receive data from a DuplexConnection even if the underlying transport was multiple threaded. If this is still a problem you should get this even if there is only one event loop.


#9

Yes, I agree ByteBuf related problem is unlikely related to rsocket impl, after I read thru the UnicastProcessor implementation as there is only one thread that drains the queue inside it. In fact, after I switched to using one worker thread for single connection, I no longer see this issue at all.

However, UnicastProcessor.onNext and UnicastProcessor.onComplete could still be called out of order, right?

Netty isn’t really mutl-threaded. The number of workers is the number of event loops. A single rsocket stream be bound on the event loop/channel that it gets subscribed on.

I’m confused at what you explained above. If I use the default TcpClient, it creates loop according to cores I have. When I dump thread name in UnicastProcessor,onNext, I do see different thread names. That means netty indeed uses multiple threads from a single loop. It’s a question though if different threads are calling UnicastProcessor.onNext simultaneously or one at a time. If simultaneously, then onNext and onComplete may be out of order.

Again I would like to restate that queue implementation is important. As you can see from the usage of UnicastProcessor, suppliers may not be synchronized, and if the queue doesn’t ensure FIFO, the thread that drains it could still be processing payloads out of order.

Hope this clarifies.

Thanks,
Zhenyuan


#10

Unicast processor is only drained on the thread of the channel it belongs too. If you have 2 event loops A and B - A will not drain data enqueued on event loop B. You will see two different threads however if you have enough requests because netty will select an event loop behind the scenes to attach the channel too. Channels aren’t thread-safe and bound to a single event loop. The code functions the same regardless if it’s a pool of a single work or multiple workers.

.map(…).skip(1).map(…) is going to be executed synchronous all on the same thread. The only way it would be be on different threads if you did something like
.flatMap(…subcribeOn(Schedulers.parallel)).skip(1).flatMap(…subscribeOn(Schedulers.parallel)
That would cause the flatmaps to both execute on pontentially different threads.

Also - the order your Payloads are emitted will not effect what you wrote. If you had a stream A,B,C and the data emitted C,A,B the emission would trigger you code the same way. The queues in reactor-core are thread safe, and only allow one thread to drain them. The only time you can run into a problem with this is if your application has multiple threads calling on next on a processor. For instance if you have a processor that is shared between channels (which is not the case with RSocket because it doesn’t share any processors between channels). If you do have a process that is shared across threads and you care about the order your data is emitted than you can use .serialize() method when create a queue to ensure order, or create a synchronized method somewhere.


#11

I think I understand it now - didn’t realize that netty binds channel to one thread. So, in this single connection/channel case, only one (and same) thread is picked for doing all the work, regardless of number of threads in the loop. (On the other hand, having more threads in the loop is not going to help). With all that, this is effectively single threaded.

Thanks for your patience :slight_smile:


#12

No problem - let me know if you need help with anything else.