Tutorial by Examples

We want to gather data created by multiple Workers. First we create a Queue: sink = Queue.new Then 16 workers all generating a random number and pushing it into sink: (1..16).to_a.map do Thread.new do sink << rand(1..100) end end.map(&:join) And to get the data, conver...
We want to process data in parallel. Let's populate source with some data: source = Queue.new data = (1..100) data.each { |e| source << e } Then create some workers to process data: (1..16).to_a.map do Thread.new do until source.empty? item = source.pop sleep 0.5 ...
We want to process data in parallel and push it down the line to be processed by other workers. Since Workers both consume and produce data we have to create two queues: first_input_source = Queue.new first_output_sink = Queue.new 100.times { |i| first_input_source << i } First wave of...
q = Queue.new q << "any object including another queue" # or q.push :data There is no high water mark, queues can infinitely grow. #push never blocks
q = Queue.new q << :data q.pop #=> :data #pop will block until there is some data available. #pop can be used for synchronization.
syncer = Queue.new a = Thread.new do syncer.pop puts "this happens at end" end b = Thread.new do puts "this happens first" STDOUT.flush syncer << :ok end [a, b].map(&:join)
q = Queue.new q << 1 q << 2 a = Array.new a << q.pop until q.empty? Or a one liner: [].tap { |array| array < queue.pop until queue.empty? }
To avoid infinitely blocking, reading from queues shouldn't happen on the thread merge is happening on. To avoid synchronization or infinitely waiting for one of queues while other has data, reading from queues shouldn't happen on same thread. Let's start by defining and populating two queues:...

