You can dynamically fork a flow in multiple subflows using groupBy. The continuing stages are applied to each subflow until you merge them back using mergeSubstreams.
val sumByKey: Flow[(String, Int), Int, NotUsed] = 
  Flow[(String, Int)].
    groupBy(Int.maxValue, _._1).  //forks the flow
    ...