mapWithState, similarly to updateState, can be used to create a stateful DStream based on upcoming data. It requires StateSpec:
import org.apache.spark.streaming._
object StatefulStats {
val state = StateSpec.function(
(key: String, current: Option[Double], state: State[StatCounter]) =&g...