PHP:共享公共对象的线程代理

我正在运行非线程
PHP的CGI程序,这是人工生命和进化的一个小练习.生物体具有基因组和解释器,这使得它们执行某些操作,包括在共享的世界地图中移动和相互交互.目前我通过使用通过MySQL数据库交互的多个PHP进程来保持较差的线程外观,但我想重写代码,以便它使用pthreads在单个线程中连续运行,不一定使用数据库(尽管我’ d可能想保留报告).

我一直在浏览github上的问题和解答问题和代码示例,但是没有找到任何东西 – 据我所知 – 解决了我想要的问题.因为我不是一个天才的OOP编码器,而且我是编写线程代码的新手,特别是在PHP中,我的问题相当广泛.

我试图通过编写显示我正在尝试做的代码来缩小我的问题范围,但它可能仍然过于宽泛.我对如何进一步缩小范围表示感谢.

我对以下代码的疑问:

>如何让Organism对共享的World对象进行操作,以便将World对象中的更改传递给所有线程,避免冲突并保持连贯性?
>鉴于人口规模最终是可变的,有没有办法引用世界对象的生物体部分(即$world->有机体),并让World能够创建新的生物体,如图所示下面(有缺陷的)代码?
>鉴于我最终想要创建数百个生物群体,您是否有任何关于限制活动线程数量(即限制内存/ CPU使用率)的指针,同时仍保持一致性?

下面的代码(当然没有运行,但是)说明了我想要实现的目标:

/*
 * Shared object containing a map of the world and 
 * methods for getting/setting coordinates
 */
class World extends Thread
{
    public $map;
    public $sx;
    public $sy;
    public $organisms;

    // set all map coords to 0
    public function __construct($sx, $sy, $p)
    {
        $map = array();
        for( $i = 0; $i < $sx; $i++ )
        {
            $map[$i] = array();
            for( $j = 0; $j < $sy; $j++ )
            {
                $map[$i][$j] = 0;
            }
        }
        $this->map = $map;
        $this->sx = $sx;
        $this->sy = $sy;

        // create and start organism threads
        $this->organisms = array();
        for( $i = 0; $i < $p; $i++ )
        {
            // this won't work because threaded objects created
            // within a thread scope that have no external references
            // are destroyed in the constructor
            $this->makeOrganism($i+1, $i+1, $i+1);
        }
    }

    // produces a new organism, adds to world population
    public function makeOrganism($x, $y, $value)
    {
        if( $x < 1 || $x > $this->sx ) return false;
        if( $y < 1 || $y > $this->sy ) return false;
        if( $this->getMap($x, $y) != 0 ) return false;

        echo "creating new organism $value\n";
        $organisms = $this->organisms;
        // doesn't work because the world data is decoupled in the new thread
        $organisms[] = new Organism($this, $x, $y, $value);
        $this->organisms = $organisms;

        return true;
    }

    // assumes valid coords
    public function setMap($x, $y, $value)
    {    
        return $this->map[$x-1][$y-1] = $value;
    }

    // assumes valid coords
    public function getMap($x, $y)
    {
        return $this->map[$x-1][$y-1];
    }

    public function getSX()
    {
        return $this->sx;
    }

    public function getSY()
    {
        return $this->sy;
    }

    public function run() 
    {
        for( $i = 0; $i < count($this->organisms); $i++ )
        {
            echo "starting organism ", $this->value, "\n";
            $this->organisms[$i]->start();
        }
    }
}

/*
 * Autonomously running agent accessing shared World map
 */
class Organism extends Thread
{
    public $value;
    public $world;
    public $x;
    public $y;

    public function __construct(World $world, $x, $y, $value)
    {
        $this->world = $world;
        $this->value = $value;
        $this->x = $x;
        $this->y = $y;
        // assume coordinates are empty
        $this->world->setMap($x, $y, $value);
    }

    // try to move organism by $dx, $dy
    public function move($dx, $dy)
    {
        $x = $this->x + $dx;
        $y = $this->y + $dy;
        if( $x < 1 || $x > $this->world->getSX() ) return false;
        if( $y < 1 || $y > $this->world->getSY() ) return false;
        if( $this->world->getMap($x, $y) != 0 ) return false;

        $this->world->setMap($x, $y, $this->value);
        $this->world->setMap($this->x, $this->y, 0);
        $this->x = $x;
        $this->y = $y;
        return true;
    }

    public function getValue()
    {
        return $this->value;
    }

    public function run()
    {
        // infinite loop; organisms move randomly about until they die
        while( true )
        {
            echo "running organism ", $this->getValue(), "\n";
            // this should operate on the shared object World, 
            // maintaining consistency and avoiding conflicts between threads
            $dx = rand(-1, 1);
            $dy = rand(-1, 1);
            $this->move($dx, $dy);

            // occasionally add an organism to the population by cloning this one
            if( rand(0, 100) > 95 )
            {
                $this->world->makeOrganism($this->x+1, $this->y+1, $this->value+100);
            }

            // wait random interval, organisms are
            // not expected to move all at the same time
            $this->wait(1000 + rand(500, 1500));
        }
    }
}

// initialize shared object
$world = new World(50, 50, 50);
$world->start();
$world->join();

最佳答案 我要回答pthreads v3,PHP7.

请不要将pthreads v2用于新项目,v3要优越得多.

How do I get an Organism to act on a shared World object so that the changes in the World object are communicated to all threads, avoiding conflicts and maintaining coherence?

以下代码创建了一组操作共享对象的线程:

<?php
class Test extends Thread {

    public function __construct(Threaded $shared) {
        $this->shared = $shared;
    }

    public function run() {
        $this->shared[] = $this->getThreadId();
    }
}

$shared = new Threaded();
$tests = [];
for ($i = 0; $i<20; $i++) {
    $tests[$i] = 
        new Test($shared);
    $tests[$i]->start();
}

foreach ($tests as $test)
    $test->join();

var_dump($shared);
?>

会产生类似于:

object(Threaded)#1 (20) {
  [0]=>
  int(140322714146560)
  [1]=>
  int(140322703144704)
  [2]=>
  int(140322621355776)
  [3]=>
  int(140322612963072)
  [4]=>
  int(140322604570368)
  [5]=>
  int(140322596177664)
  [6]=>
  int(140322587784960)
  [7]=>
  int(140322579392256)
  [8]=>
  int(140322570999552)
  [9]=>
  int(140322487138048)
  [10]=>
  int(140322478745344)
  [11]=>
  int(140322470352640)
  [12]=>
  int(140322461959936)
  [13]=>
  int(140322453567232)
  [14]=>
  int(140322445174528)
  [15]=>
  int(140322436781824)
  [16]=>
  int(140322428389120)
  [17]=>
  int(140322419996416)
  [18]=>
  int(140322411603712)
  [19]=>
  int(140322403211008)
}

元素数量一致并不是偶然的:

$this->shared[] = $this->getThreadId();

执行此操作时,pthreads会为您提供安全保障.

任何引用Threaded $shared对象的Thread都可以操作它.

一致性和安全性是两回事,请考虑以下代码:

<?php
class Test extends Thread {

    public function __construct(Threaded $shared) {
        $this->shared = $shared;
    }

    public function run() {
        if (!isset($this->shared[0])) {
            $this->shared[0] = $this->getThreadId();
        }
    }

    private $shared;
}

$shared = new Threaded();
$tests  = [];

for ($i = 0; $i < 16; $i++) {
    $tests[$i] = 
        new Test($shared);
    $tests[$i]->start();
}

foreach ($tests as $test)
    $test->join();
?>

您可能希望只有一个线程可以通过此路径:

$this->shared[0] = $this->getThreadId();

但是,并不能保证这一点.由于在调用isset和上面的语句之间没有锁定,因此许多线程可以自由地同时传递路径.

使用以下代码替换Test的run方法将确保一致性:

    public function run() {
        $this->shared->synchronized(function(){
            if (!isset($this->shared[0])) {
                $this->shared[0] = $this->getThreadId();
            }
        });
    }

Given that the population size is ultimately variable, is there a way to make the references to the Organisms part of the World object (ie. $world->organisms), and have World be able to create new Organisms, as shown in below (faulty) code?

这听起来违反了SOLID的基本原则.无论如何,追逐这不是一件好事.

听起来有机体应该属于主要过程,因此需要在那里构建并传递给线程或工作者或其他任何东西.

它们需要属于主进程,因为它们可能最终出现在多个Thread中.

Given that I ultimately want to create a population of hundreds of Organisms, do you have any pointers on limiting the number of active threads (ie. limiting memory/cpu usage) while still maintaining consistency?

使用pthreads提供的Pool实现.

以下是一些代码:

<?php
class Organisms extends Volatile {}

class World extends Threaded {

    public function __construct(Organisms $organisms, Volatile $grid) {
        $this->organisms = $organisms;
        $this->grid = $grid;
    }

    public function populate($organism, int $x, int $y) : bool {
        $reference = $this->getGridReference($x, $y);

        return $this->grid->synchronized(function() use($organism, $reference) {
            if (isset($this->grid[$reference]))
                return false;
            return (bool) $this->grid[$reference] = $organism;
        });
    }

    private function getGridReference(int $x, int $y) {
        return sprintf("%dx%d", $x, $y);
    }

    public function getOrganisms() { return $this->organisms; }

    private $organisms;
}

class Organism extends Threaded {

    public function __construct(World $world) {
        $this->world = $world;
    }

    public function setPosition(int $x, int $y) {
        $this->x = $x;
        $this->y = $y;
    }

    public function getWorld() { return $this->world; }

    private $world;
    private $x = -1;
    private $y = -1;
}

class OrganismPopulateTask extends Threaded {

    public function __construct(World $world, Organism $organism, int $x, int $y) {
        $this->world = $world;
        $this->organism = $organism;
        $this->x = $x;
        $this->y = $y;
    }

    public function run() {
        if ($this->world->populate(
            (object) $this->organism, $this->x, $this->y)) {
                $this->organism->setPosition($this->x, $this->y);
        }
    }

    private $world;
    private $organism;
    private $x;
    private $y;
}

$organisms = new Organisms();
$grid = new Volatile();
$world = new World($organisms, $grid);
$pool = new Pool(16);

$organisms[] = new Organism($world);
$organisms[] = new Organism($world);

$pool
    ->submit(new OrganismPopulateTask($world, $organisms[0], 10, 10));
$pool   
    ->submit(new OrganismPopulateTask($world, $organisms[1], 10, 10));

$pool->shutdown();

var_dump($world);
?>

将产量:

object(World)#3 (2) {
  ["organisms"]=>
  object(Organisms)#1 (2) {
    [0]=>
    object(Organism)#5 (3) {
      ["world"]=>
      *RECURSION*
      ["x"]=>
      int(10)
      ["y"]=>
      int(10)
    }
    [1]=>
    object(Organism)#6 (3) {
      ["world"]=>
      *RECURSION*
      ["x"]=>
      int(-1)
      ["y"]=>
      int(-1)
    }
  }
  ["grid"]=>
  object(Volatile)#2 (1) {
    ["10x10"]=>
    object(Organism)#5 (3) {
      ["world"]=>
      *RECURSION*
      ["x"]=>
      int(10)
      ["y"]=>
      int(10)
    }
  }
}

注意:这使用v3.1.2中包含的功能

其中大多数应该是自我解释的,显式强制转换是为了避免因尝试连接已经消失的对象而导致的异常.

需要注意的主要事项是每个“动作”都被视为“任务”并提交给池.

点赞