We want to process data in parallel and push it down the line to be processed by other workers.
Since Workers both consume and produce data we have to create two queues:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
First wave of workers read an item from first_input_source
, process the item, and write results in first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
Second wave of workers uses first_output_sink
as its input source and reads, process then writes to another output sink:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Now second_output_sink
is the sink, let's convert it to an array:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }