Promise flatMap iteration from initial result set

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Promise flatMap iteration from initial result set

WesMangum
I am trying to work out an iteration of an initial sql dataset to drive iteration for additional database queries.  Before working into an Rx solution I wanted to determine the basics first.  From what I can determine from troubleshooting efforts thus far is that I am creating a new pair but it is getting thrown away through each iteration.  I am thinking that I may need another flatMap level but cannot work this out.

...
import groovy.sql.Sql
import ratpack.groovy.sql.SqlModule
import ratpack.exec.Promise
import ratpack.func.Pair
import static ratpack.groovy.Groovy.ratpack
import static ratpack.groovy.Groovy.groovyMarkupTemplate
import ratpack.groovy.template.MarkupTemplateModule
import java.util.stream.Stream

def  buildPairStream (pair) {
  visit = { node, builder ->
    if (node instanceof Pair) {
      visit node.left, builder
      visit node.right, builder
    } else {
      builder.add node
    }
  }
  def streamBuilder = Stream.builder ()
  visit pair, streamBuilder
  return streamBuilder.build ()
}

ratpack {
  ...
  handlers {
    get ("iterate") { Sql sql ->
      Blocking.get { [heading: 'Driving table', rows: sql.rows ('with recursive (value) as (select 1 from dual union all select value + 1 from recursive where value < 10) select value from recursive')] }
      .flatMap { initial ->
        Blocking.get { [heading: 'Supplemental table', rows: sql.rows ('select dummy from dual')] }
        .map { dataset -> Pair.of (initial, dataset) }
        .flatMap { pair ->
          pair.left.rows.each {
            Blocking.get { [heading: 'Iteration ${it.value}', rows: sql.rows ("select ${it.value} as value from dual")] }
            .map { dataset -> pair.pushRight (dataset) }
          }
          Promise.value pair
        }
      }
      .then { render groovyMarkupTemplate ('tables.gtpl', heading: 'FlatMap Pair Iteration Example - Version 1', results: buildPairStream (it)) }
    }
  }
}

// in file templates/tables.gtpl:
...
    results.forEach { dataset ->
...
Reply | Threaded
Open this post in threaded view
|

Re: Promise flatMap iteration from initial result set

danhyun
Hi Wes,

It looks like the problem is here:
```
pair.left.rows.each {
            Blocking.get { [heading: 'Iteration ${it.value}', rows: sql.rows ("select ${it.value} as value from dual")] }
            .map { dataset -> pair.pushRight (dataset) }
          }
          Promise.value pair
```

In this flatmap, you're simply return Promise.value(pair) which does _not_ process the new Blocking that you defined here. Flatmap takes a Promise and unpacks the contents of the Promise for further processing. These Promises created from the for each loop here never get subscribed to so they never get resolved.

You would probably have to do something like this:

.flatMap { pair ->
Promise.of { downstream ->

      AtomicInteger i = new AtomicInteger()
      def results = []
     
      pair.left.rows.each { item ->
        Execution
          .fork()
          .onComplete { e ->
            int j = i.incrementAndGet()
            if (j == list.size()) {
              downstream.success(results)
            }
          }.onError { e -> i.incrementAndGet() }
          .start { e ->
            Blocking.get { [heading: 'Iteration ${item.value}', rows: sql.rows ("select ${item.value} as value from dual")] }
            .map { dataset -> pair.pushRight (dataset) } .then { results.add(it) }
          }
      }
}




Promise has Promise#blockingMap and Promise#blockingOp

https://ratpack.io/manual/current/api/ratpack/exec/Promise.html#blockingMap-ratpack.func.Function-
https://ratpack.io/manual/current/api/ratpack/exec/Promise.html#blockingOp-ratpack.func.Action-

Which do the same thing as

Blocking.get { }.flatMap { Blocking.get {} }

This could just become Blocking.get { }.blockingMap { }