我想设置一些在一个通道上同时运行的线程,并且每个线程也应该为通道提供信息.其中一个线程将决定何时停止.但是,这是我最接近这样做的:
use Algorithm::Evolutionary::Simple;
my $length = 32;
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
my $channel-one = $supply.Channel;
my $pairs-supply = $supply.batch( elems => 2 );
my $channel-two = $pairs-supply.Channel;
my $single = start {
react {
whenever $channel-one -> $item {
say "via Channel 1:", max-ones($item);
}
}
}
my $pairs = start {
react {
whenever $channel-two -> @pair {
my @new-chromosome = crossover( @pair[0], @pair[1] );
say "In Channel 2: ", @new-chromosome;
$supplier.emit( @new-chromosome[0]);
$supplier.emit( @new-chromosome[1]);
}
}
}
await (^10).map: -> $r {
start {
sleep $r/100.0;
$supplier.emit( random-chromosome($length) );
}
}
$supplier.done;
这在多次排放后停止.而且它可能不会同时运行.我使用的是通道而不是耗材和水龙头,因为它们不会同时运行,而是异步运行.我需要耗材,因为我想要一个seudo-channel,成对地获取元素,就像上面所做的那样;我没有看到用纯粹的渠道做到这一点的方式.
如果我将供应的发射量更改为通道的发送,则没有上述差异.
这里有几个问题
>这些反应块是否在不同的线程中运行?如果没有,这样做的方式是什么?
>即使它们不是,即使$pair一直在向频道发送,为什么它也会停止?
>我可以从单项渠道自动创建“批量”渠道吗?
更新1:如果我从最后消除$supplier.done,它将阻止.如果我创建一个promise in whenever
,每次读取一个,它就会阻塞并且什么都不做.
最佳答案 答案在这里,被剥离到最低限度
my Channel $c .= new;
my Channel $c2 = $c.Supply.batch( elems => 2).Channel;
my Channel $output .= new;
my $count = 0;
$c.send(1) for ^2;
my $more-work = start react whenever $c2 -> @item {
if ( $count++ < 32 ) {
$c.send( @item[1]);
my $sum = sum @item;
$c.send( $sum );
$output.send( $sum );
} else {
$c.close;
}
}
await $more-work;
loop {
if my $item = $output.poll {
$item.say
} else {
$output.close;
}
if $output.closed { last };
}
通过从通道($c.Supply)创建供应来使用每两个元素批次第一个通道的第二个通道,批量供应两个批次(批次(elems => 2))并将其转回进入一个频道.为输出创建第三个通道.
为了不耗尽电源并挂断通道,从第一个(实际上是唯一的)通道读取的每个第二个元件都放回到那里.所以第二个读取二进制的通道永远不会挂起或等待新元素.
为每个新元素创建一个输出通道,并在需要时为外部计数器完成操作;输出通道以非阻塞方式读取,并在最后一行中没有任何内容可读时关闭.
准确回答我原来的问题:
>是的,他们是,只有他们互相窃取元素.
>因为两个线程都是从同一个通道读取的.第一个偶然发现元素,读取它.
>是的,通过将通道转换为耗材,对它们进行批处理并将它们转回通道.只记住它们不是副本,它们将分享自己相同的元素.