WebSockets - subscribing to specific item's events, using RxJava

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

WebSockets - subscribing to specific item's events, using RxJava

Asaf
Hey,

First, I'd like to say thanks for this great Framework, it's really a game changer IMO for Java8 & Groovy lovers.

I have two questions, If I may,
Lets say I'd like clients to subscribe/unsubscribe to some stock's specific event types, subscription should receive events only related to that stock, connection should never get closed unless the client unsubscribe.

What would be the appropriate way to do such subscription/publishing for specific event types in a model context?

Another thing (related in a way), I'd like to publish RxJava Observable as WS events,
I tried:

chain.get("publish", ctx -> {
  Observable<String> o = Observable.just("a", "b", "c");
  Publisher<String> p = publisher(o);
  WebSockets.websocketBroadcast(ctx, p);
});

But I get:
ratpack.handling.DoubleTransmissionException: attempt at double transmission for: /api/publish

Same result when trying to convert an Observable to a Publisher via RxReactiveStreams.toPublisher


Thanks!

Asaf.
Reply | Threaded
Open this post in threaded view
|

Re: WebSockets - subscribing to specific item's events, using RxJava

danveloper
Administrator
I think I understand what you're asking. The basic problem that you're running into right now is that a Handler is a functional interface and thus retains no state. Persistent WebSocket links, on the other hand, require state. To that extent, the publisher to a WebSocket stream must be retained in a stateful object so that message publication can be managed outside of the handler.

For example, here is a Ratpack app that uses a StreamContainer class to retain and provide the publisher for a given StreamType:

public class Main {

  static class StreamContainer implements Service {
    enum StreamType {
      A, B, C;
    }

    private Map<StreamType, PublishSubject<String>> streamStorage = Maps.newHashMap();

    public void onStart(StartEvent event) {
      for (StreamType t : StreamType.values()) {
        streamStorage.put(t, PublishSubject.<String>create());
      }
    }

    public void publish(StreamType type, String message) {
      streamStorage.get(type).onNext(message);
    }

    public Publisher<String> getStream(StreamType t) {
      return RxRatpack.publisher(streamStorage.get(t));
    }
  }

  public static void main(String[] args) throws Exception {
    RatpackServer.start(spec -> spec
      .serverConfig(sbuild -> sbuild
          .baseDir(BaseDir.find())
      )
      .registryOf(rspec -> rspec
        .add(new StreamContainer())
      )
      .handlers(chain -> chain
              .get("ws/:type?", ctx -> {
                StreamContainer container = ctx.get(StreamContainer.class);
                StreamContainer.StreamType type = StreamContainer.StreamType.valueOf(ctx.getPathTokens().getOrDefault("type", "A"));
                WebSockets.websocketBroadcast(ctx, container.getStream(type));
              })
      )
    );
  }
}

The Publisher in this case is derived from a RxJava PublishSubject. The StreamContainer class is queried for the publisher when a client subscribes to the endpoint for a specified StreamType.

I've pushed the example code to GitHub => https://github.com/danveloper/ratpack-rx-websockets

Let me know if this helps or if this needs further clarification. Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: WebSockets - subscribing to specific item's events, using RxJava

Asaf
Dan,

Awesome, thanks for the investing the time with the detailed answer and the code sample, I'll take a look at it tomorrow morning.


One thing i'm curious about (I'm not sure about all the details as this is all new to me),  but if we'r dealing with a non persisted WS link such as just streaming a pipeline of strings,

as streaming a periodically publisher via WS broadcast through a  handler:

//this works just fine
Publisher<String> stream = periodically(ctx, Duration.ofSeconds(2), i -> i < 5 ? i.toString() : null);
WebSockets.websocketBroadcast(ctx, stream);

Why would broadcasting a publisher created by RxReactiveStreams.toPublisher(observableInstance) would fail?

Is it related to the fact that context is being passed to the periodically function?


Yet again, thanks!

Asaf.
Reply | Threaded
Open this post in threaded view
|

Re: WebSockets - subscribing to specific item's events, using RxJava

Stella Robinson
RxJava is ruled the world of Java development from last couple of years. The best use of RxJava is to develop an Android applications. RxJava got an enormous response in the Android world because it fully matches the modern solutions for writing a composable code.


Rxjava becomes more easy with Retrofit for mobile app development Retrofit is a sort of safe REST Android client created by Square. The library gives a potential framework for confirming and interfacing with APIs and sending parsing system requests in alignment with OkHttp