Content with Style

Web Technique

PHP worker processes with Beanstalk and Daemontools

by Pascal Opitz on March 31 2010, 22:26

Before I get started on this one I want to apologize for being slack the last couple of months. Maybe being on holiday for two months slightly corrupted my morale, or maybe that was down to the nice wine tasting sessions in the Barossa Valley and the McLaren Vale. In any case, it's been a long time. I have been back for one month but I am still finding it hard to live up to my own expectations in terms of writing and tech research.

But after moaning, on with the programme:

Sometimes things just get too heavy for a straight forward approach. Memory usage might be too high or interaction might be delayed. In this case it might make sense to queue the task up for later execution.

A message queue

Beanstalkd is a very easy to use message queue. There are client libraries for it in many languages, and it seems to be very popular amongst the Ruby crowd.

Installing it on OSX using macports is easy peasy:

sudo port install beanstalkd

Kicking it off is equally as easy:

beanstalkd -d -l 127.0.0.1 -p 11300

Pushing things into the queue

We're using pheanstalk as the client library to connect to beanstalkd, and this script is just generating 1000 dummy jobs to be picked up later by our daemonized worker process.

<?php
require_once('pheanstalk/pheanstalk_init.php');
$pheanstalk = new Pheanstalk('127.0.0.1:11300');

for($i=0; $i<1000; $i++) {
  $job = new stdClass();
  $job->envelope_id = rand();
  $job->date = date('Y-m-d H:i:s');
  $job_data = json_encode($job);
  $pheanstalk->useTube('test')->put($job_data);
  echo "pushed: " . $job_data . "\n";
}

Picking up things from the queue

Our worker script now needs to connect to the queue and pick up the jobs. Things get dumped into a logfile which we can have an eye on to see if it's running alright. In order to prevent memory leaks it terminates itself when it hits a certain memory threshold. In this case the threshold is just picked for demo purposes, and the counter and the done_jobs array are just there to increase the memory footprint.

<?php
class Worker {
  
  private $path;

  public function __construct($path) {
    $this->setBasePath($path);
    $this->log('starting');
    require_once('pheanstalk/pheanstalk_init.php');
    $this->pheanstalk = new Pheanstalk('127.0.0.1:11300');
  }
   
  public function __destruct() {
    $this->log('ending');
  }
  
  private function setBasePath($path) {
    $this->path = $path;
  }

  public function run() {
    $this->log('starting to run');
    $cnt = 0;
    $done_jobs = array();

    while(1) {
      $job = $this->pheanstalk->watch('test')->ignore('default')->reserve();
      $job_encoded = json_decode($job->getData(), false);
      $done_jobs[] = $job_encoded;
      $this->log('job:'.print_r($job_encoded, 1));
      $this->pheanstalk->delete($job);
      $cnt++;

      $memory = memory_get_usage();

      $this->log('memory:' . $memory);

      if($memory > 1000000) {
        $this->log('exiting run due to memory limit');
        exit;
      }

      usleep(10);
    }
  }
  
  private function log($txt) {
    file_put_contents($this->path . '/log/worker.txt', $txt . "\n", FILE_APPEND);
  }
}

$worker = new Worker(dirname($argv[0]));
$worker->run();

Daemonize the worker process.

Now the only problem we have is keeping the worker process running. Deamontools are a collection of binaries that can supervise processes and restart them when they stop.

Installing daemontools

Again, installing on OSX is pretty straight forward:

sudo port install daemontools

We also need a shell script called ./run to be supervised by daemontools. Usually these go into a subfolder of /service, but the macports installation uses /opt/local/var/svscan/service instead. I chose to create a subfolder in my application and then symlink it into there. The shell script itself is pretty simple:

#!/bin/sh
php ../worker.php

Now launch daemontools and you're up and away. Kick off the push script and the worker activity should show up nicely in the logfile.

In this case I am using daemontools to do this, but there are other tools to do it as well. The best option seemed to be supervisord, but the download page was down when I did the proof of concept and I had to settle for something else. Supervisord also seems to be able to watch the memory footprint of a task so this bit in the worker script might be obsolete.

There are also options to use init.d scripts, but my knowledge of that is lacking. If you cannot be bothered to daemonize, you could also just kick off a cron job.

I hope this is a helpful little writeup. Feel free to download the nicely zipped up demo code, and do drop a comment if you have anything to add.

Comments

  • Hi

    I just came across your post while was looking for ways to save queue data in case of system crash. your post suggest to create a separate txt file to dump data for future restore. I am yet not clear about it. i started beanstalk with -b switch and specified directory. I pushed the data and i did not find any data in folder

    by Adnan on November 8 2010, 05:56 #

  • Yes, start beanstalkd with the -b option. This won\'t be a human readable textfile though, but a bin log. Make sure you have the right permissions etc. How about killing the process and restarting it to see if the jobs are still in the queue?

    If you want to inspect queues etc via command line, you can use beanspector that comes with my nodestalker library.

    by Pascal Opitz on November 8 2010, 06:38 #