Thread-safe queue for sending values from producers to consumers
Channel is a thread-safe queue that helps you to send a series of objects from one or more producers to one or more consumers. Each object will arrive at only one such consumer, selected by the scheduler. If there is only one consumer and one producer, the order of objects is guaranteed to be preserved. Sending on a
Channel is non-blocking.
my = Channel.new;await (^10).map:.close;say .list;
Further examples can be found in the concurrency page
method send(Channel: \item)
Enqueues an item into the
Channel. Throws an exception of type X::Channel::SendOnClosed if the channel has been closed already. This call will not block waiting for a consumer to take the object. There is no set limit on the number of items that may be queued, so care should be taken to prevent runaway queueing.
my = Channel.new;.send(1);.send([2, 3, 4, 5]);.close;say .list; # OUTPUT: «(1 [2 3 4 5])␤»
Receives and removes an item from the channel. It blocks if no item is present, waiting for a
send from another thread.
Throws an exception of type X::Channel::ReceiveOnClosed if the channel has been closed, and the last item has been removed already, or if
close is called while
receive is waiting for an item to arrive.
If the channel has been marked as erratic with method
fail, and the last item has been removed, throws the argument that was given to
fail as an exception.
poll for a non-blocking version that won't throw exceptions.
my = Channel.new;.send(1);say .receive; # OUTPUT: «1␤»
Receives and removes an item from the channel. If no item is present, returns
Nil instead of waiting.
my = Channel.new;Promise.in(2).then:^10 .map();loop
receive for a blocking version that properly responds to channel closing and failure.
Channel, normally. This makes subsequent
send calls die with X::Channel::SendOnClosed. Subsequent calls of
.receive may still drain any remaining items that were previously sent, but if the queue is empty, will throw an X::Channel::ReceiveOnClosed exception. Since you can produce a
Seq from a Channel by contextualizing to array with
@() or by calling the
.list method, these methods will not terminate until the channel has been closed. A whenever-block will also terminate properly on a closed channel.
my = Channel.new;.close;.send(1);CATCH ;# OUTPUT: «X::Channel::SendOnClosed: Cannot send a message on a closed channel␤»
Please note that any exception thrown may prevent
.close from being called, this may hang the receiving thread. Use a LEAVE phaser to enforce the
.close call in this case.
method list(Channel: --> List)
Returns a list based on the
Seq which will iterate items in the queue and remove each item from it as it iterates. This can only terminate once the
close method has been called.
my = Channel.new; .send(1); .send(2);.close;say .list; # OUTPUT: «(1 2)␤»
method closed(Channel: --> Promise)
Returns a promise that will be kept once the channel is closed by a call to method
my = Channel.new;.closed.then();.close;sleep 1;
method fail(Channel: )
Channel (that is, makes subsequent
send calls die), and enqueues the error to be thrown as the final element in the channel. Method
receive will throw that error as an exception. Does nothing if the channel has already been closed or
.fail has already been called on it.
my = Channel.new;.fail("Bad error happens!");.receive;CATCH ;# OUTPUT: «X::AdHoc: Bad error happens!␤»
method Capture(Channel --> Capture)
Equivalent to calling
.List.Capture on the invocant.
my = Channel.new;my Supply = .Supply;my Supply = .Supply;.tap(-> );.tap(-> );^10 .map();sleep 1;
Multiple calls to this method produce multiple instances of Supply, which compete over the values from the Channel.
multi sub await(Channel)multi sub await(*@)
Waits until all of one or more channels has a value available, and returns those values (it calls
.receive on the channel). Also works with promises.
my = Channel.new;Promise.in(1).then();say await ;
Since 6.d, it no longer blocks a thread while waiting.