问题描述:

I am using the following code to test the behaviour of akka stream Flow.batch, but I couldn't somehow figure out why the result is not what I expect:

 Source(1 to 20)

.map(x => {

println(s"received: ${x}")

x

})

.batch(max=3, first => first.toString) {(batch, elem) => {

batch + "," + elem

}}

.runWith(Sink.foreach(x=>{

Thread.sleep(4000)

println("Out:" + x)

}))

And here is the output:

received: 1

received: 2

received: 3

received: 4

Out:1,2,3

received: 5

Out:4

received: 6

Out:5

received: 7

Out:6

received: 8

Out:7

received: 9

Out:8

received: 10

Out:9

received: 11

Out:10

received: 12

Out:11

.... so on ....

received: 19

Out:18

received: 20

Out:19

Out:20

There are few points I couldn't understand here:

  • Firstly, my Sink is much slower. I expect that the item will be batched together before emitted downstream such as: Out: 1,2,3; Out: 4,5,6; Out: 7, 8; Out: 9,10,11 and so on. Instead, it is only batched once (1,2,3), but subsequently element is emitted one by one instead of being batched.
  • Why I received 4 items (received: 1, ..., received: 4) at the right beginning while in fact, I only set max=3 (batch(max=3)).
  • Because the source is much faster than the sink. I expect that the element should be emitted much faster such as: received: 7, received: 8, received: 9; then Out:7,8,9; But in fact, it is only emitted sporadically one by one and only after Sink's println function is executed.

I have tried to change map to mapAsync but the behaviour is still not what I am looking for:

 .mapAsync(1)(x => {

println(s"received: ${x}")

Future.successful(x)

})

Thanks.

网友答案:

There is no asynchronous boundary anywhere in your code, it will run on a single thread. Basically, while your Thread.sleep() executes no other progress happens in this setup, i.e. batching cannot happen (since the thread is blocked on Thread.sleep). If you have such a setup then you can just use grouped() instead of batch, or maybe groupedWithin(). If you still want to try out batch(), then try a throttle stage instead of adding a sleep. Throttle will not block the thread, so upstream progress (batching) is not affected.

相关阅读:
Top