Ruby Language Une source - Pipeline of Work - Un évier


Exemple

Nous souhaitons traiter les données en parallèle et les acheminer sur la ligne à traiter par les autres travailleurs.

Puisque les travailleurs consomment et produisent des données, nous devons créer deux files d'attente:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

La première vague de travailleurs lit un élément de first_input_source , traite l'élément et écrit les résultats dans first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

La deuxième vague de travailleurs utilise first_output_sink comme source d’entrée et lit, puis écrit dans un autre first_output_sink de sortie:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Maintenant second_output_sink est le second_output_sink , convertissons-le en un tableau:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }