Handling exceptions when using streamed responses.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling exceptions when using streamed responses.

JamesL
Hi Guys,

I'm having a few issues with Response.sendStream (https://ratpack.io/manual/current/api/ratpack/http/Response.html#sendStream-org.reactivestreams.Publisher-).

My "publisher" makes a remote call using Apache http client (I know, I wanted to use Ratpack http client but require connection pooling) and converts the response to various formats. Of course this remote call can fail and I need to be able to handle this (or at the very least log it) but can't see how.

I've looked through DefaultResponseTransmitter and context.error (https://ratpack.io/manual/current/api/ratpack/handling/Context.html#error-java.lang.Throwable-) is never called as it is when context.render is used (https://github.com/ratpack/ratpack/blob/master/ratpack-core/src/main/java/ratpack/handling/internal/DefaultContext.java#L304) so my ServerErrorHandler is never called. Is this by design or am I missing something?

Thanks in advance, James.
Reply | Threaded
Open this post in threaded view
|

Re: Handling exceptions when using streamed responses.

danhyun
This post was updated on .
The contract between the Publisher and the Subscriber is that the subscriber subscribes to the Publisher who can then start sending information to the subscriber.

When subscribed to a Publisher, the Publisher gets a handle to the Subscriber instance which looks like this: http://www.reactive-streams.org/reactive-streams-1.0.0.RC4-javadoc/org/reactivestreams/Subscriber.html

In particular you may be interested in the Subscriber#onError(Throwable) method to notify the Subscriber that an error has occured.

In this case I'd imagine your code would look something like:


Publisher publisher = subscriber -> {
  try {
    HttpResponse response = apacheClient.execute(request);
    // convert response to ByteBuf
    // stream response to subscriber
    // subscriber.onNext(someChunkFromResponse);  
    subscriber.onComplete();
  } catch (IOException ioe) {
    subscriber.onError(ioe);
  }
}

context.getResponse().sendStream(publisher);

I hope this helps.
Reply | Threaded
Open this post in threaded view
|

Re: Handling exceptions when using streamed responses.

JamesL
Hi danhyun, thanks for the response.

Essentially that is what my code does, though the output travels through a few reactive "processors" on the way out to convert from msgpack to json or xml. I've even added a "wiretap" and can see the exception propogate downstream:

        TransformablePublisher<ByteBuffer> indexQueryResponse = indexQueryFactory.newQuery()
                .execute()
                .wiretap { StreamEvent streamEvent ->
                    if (streamEvent.error)
                        log.error("query error ${streamEvent.throwable.message}", streamEvent.throwable)
                }
                .map { Unpooled.copiedBuffer(it) }
                .buffer()

        context.render(bufferChunks('text/xml', indexQueryResponse))

The exception appears in the console so I know it's getting out to Ratpack correctly ("WARN  r.s.i.DefaultResponseTransmitter - Exception thrown transmitting stream").

My problem is there is no way for me to respond to it that I am aware of. The only thing I can thing of doing is to put another Processor in the chain which intercepts onError calls and calls context.error but that seems a little hacky to me :(

I've just uploaded this which highlights my problem Calls to "/render-error" notify the ServerErrorHandler but calls to "/streamed" do not.

https://gist.github.com/brucenunk/ecb87ab9965932a6c61a

As I said I've looked at DefaultResponseTransmitter (which is the "subscriber" and there is no sign of any call to context.error).