Catalyst Model #9: TheSchwartz

Published · Thursday, 23 July 2009 (Updated · 25 February 2011)

TheSchwartz is a job queue management package. Essentially you post jobs to it, it stores them in the background in a database, it runs them based on job running packages you write, and it reports on the results.

There are a couple of caveats about it. 1) It fails most test installations right now so you’re probably going to have to force install it. 2) Its documentation kinda sucks. The latter isn’t because the documents are bad per se but they are sparse and missing extremely important information for a package with this many moving parts. For example: nowhere in the Pod is setting up the database described. The docs don’t even tell you where the SQL to do so is. More on that—heh-heh-heh, I typed “moron”—in a bit.

To use TheSchwartz we need–

  • TheSchwartz, duh.
  • A DB (or DBs, it’s quite flexible) for the storage.
  • A job class that runs our code and sets TheSchwartz job settings by overriding inherited subs.
  • A stiff drink. A tall one. Uh, how did that get in here? I meant to type–
  • A job running script: a daemon, or a cronjob, probably.

Here is a better write-up than I was going to give, Notes on TheSchwartz from Gavin Carr.

Install TheSchwartz + dependencies

cpan TheSchwartz
# or sadly, you might need–
cpan -fi TheSchwartz
# or follow Gavin Carr’s advice above and get it from sixapart’s svn.

We are going to use Digest::MD5 to create a unique check hash of a job by its data so that TheSchwartz will deduplicate jobs for us. We are going to use Email::Valid to bake cookies.

You probably already have JSON::XS from #5: Stock quotes. We are using it to serialize the data we’re using Digest::MD5 to hash. You also should have installed DBD::SQLite already for #7: Page view counter/tracker.

cpan Email::Valid Digest::MD5
cpan JSON JSON::XS DBD::SQLite

I would like to mention that simple scheduling has been solved quite well if a bit eccentrically in the *nix cron tool. Config::Crontab might be a better choice as a model/service engine than TheSchwartz for many uses. TheSchwartz is probably the better choice when you need unified job reporting and your jobs are generally triggered by user actions and not things that need to run at regular intervals.

We are going to auto-deploy our DB again. Instead of doing a DBIC->deploy, this time we have a script and the SQL schema to do it.

If you want to know how the heck I got the stuff set-up, the only tricky part was finding the SQL definitions. I’ve done this twice. Once for MySQL which is not what we are doing here. If you want to do that or PostgreSQL, visit the main page for TheSchwartz. Click on “[Browse]” up at the top. Click on the “/doc” dir in the directory listing. And you should see, among other things, the prizes–

  • schema-postgres.sql
  • schema.sql

Deploy the database to SQLite

For this model I found an SQLite schema in one of Brad Fitzpatrick’s github entries. This is it–

emacs etc/theschwartz-sqlite-schema.sql
CREATE TABLE funcmap (
        funcid INTEGER PRIMARY KEY AUTOINCREMENT,
        funcname VARCHAR(255) NOT NULL,
        UNIQUE(funcname)
);

CREATE TABLE job (
        jobid INTEGER PRIMARY KEY AUTOINCREMENT,
        funcid INTEGER UNSIGNED NOT NULL,
        arg MEDIUMBLOB,
        uniqkey VARCHAR(255) NULL,
        insert_time INTEGER UNSIGNED,
        run_after INTEGER UNSIGNED NOT NULL,
        grabbed_until INTEGER UNSIGNED NOT NULL,
        priority SMALLINT UNSIGNED,
        coalesce VARCHAR(255),
        UNIQUE(funcid,uniqkey)
);

CREATE TABLE error (
        error_time INTEGER UNSIGNED NOT NULL,
        jobid INTEGER NOT NULL,
        message VARCHAR(255) NOT NULL,
        funcid INT UNSIGNED NOT NULL DEFAULT 0
);

CREATE TABLE exitstatus (
        jobid INTEGER PRIMARY KEY NOT NULL,
        funcid INT UNSIGNED NOT NULL DEFAULT 0,
        status SMALLINT UNSIGNED,
        completion_time INTEGER UNSIGNED,
        delete_after INTEGER UNSIGNED
);

And here is a deploy script for it to set up the DB for the model to use.

emacs bin/deploy-theschwartz.pl
use strict;
use warnings;
use Path::Class::File;
use File::Spec;

my $self = Path::Class::File->new( File::Spec->rel2abs( $0 ) );
my $app_dir = $self->parent->parent;

my $database = Path::Class::File->new( $app_dir,
                                       "etc",
                                       "theschwartz.sqlt" );

my $schema = Path::Class::File->new( $app_dir,
                                     "etc",
                                     "theschwartz-sqlite-schema.sql" );

-e $schema or die "Schema does not exist: $schema";

$database->remove if -e $database;

0 == system("sqlite3 $database < $schema")
    or die "Couldn't load schema into db: sqlite3 $database < $schema";

After setting it up, don’t forget to run it–

bin/deploy-theschwartz.pl

Your DB should be good to go. Check it out yourself to be sure–

sqlite3 etc/theschwartz.sqlt
SQLite version 3.4.0
Enter ".help" for instructions
sqlite> .tables
error       exitstatus  funcmap     job         note 

Try .schema or .dump to inspect the DB. .q will get you out out of the sqlite prompt and there’s .help too.

Create your model for running TheSchwartz

./script/myapp_create.pl model TheSchwartz
 exists "/Users/apv/depot/sites/10in10/script/../lib/MyApp/Model"
 exists "/Users/apv/depot/sites/10in10/script/../t"
created "/Users/apv/depot/sites/10in10/script/../lib/MyApp/Model/TheSchwartz.pm"
created "/Users/apv/depot/sites/10in10/script/../t/model_TheSchwartz.t"

This is a case where we can use the Catalyst::Model::Adaptor like we did in #5: Stock quotes with a similar tweak to the argument handling.

emacs lib/MyApp/Model/TheSchwartz.pm
package MyApp::Model::TheSchwartz;
use parent "Catalyst::Model::Adaptor";
__PACKAGE__->config( class => "TheSchwartz" );

sub mangle_arguments { %{$_[1]} }

1;

Configure the model to find the DB we deployed manually

emacs myapp.yml
Model::TheSchwartz:
  args:
    verbose: 1
    databases:
      - dsn: dbi:SQLite:__path_to(etc/theschwartz.sqlt)__

Write the worker module: MyApp::Job::Reminder

emacs lib/MyApp/Job/Reminder.pm
package MyApp::Job::Reminder;
use strict;
use warnings;
no warnings "uninitialized";
use parent qw( TheSchwartz::Worker );
use TheSchwartz::Job;
use MIME::Lite;
use Sys::Hostname "hostname";

my $DEBUG = 0;

sub work {
    my $class = shift;
    my TheSchwartz::Job $job = shift;
    my $msg = MIME::Lite
        ->new(
              From    => 'bit-bucket@' . hostname(),
              To      => $job->arg->{email},
              Subject => "Something important!",
              Type    => "text/plain",
              Data    => "This is your reminder to visit again."
             );

    if ( $DEBUG )
    {
        open my $f, ">>", "/tmp/10in10.log" or die $!;
        print $f "Not sending message!\n--\n";
        print $f $msg->as_string, "\n\n";
        close $f;
    }
    else
    {
        $msg->send;
    }
    $job->completed();
}

# From our parent class we can override these-
sub keep_exit_status_for { 60 * 60 * 24 * 1 }
sub max_retries { 1 }
sub retry_delay { 30 }
sub grab_for { 60 * 3 }

1;

Write the controller to put jobs on the queue

./script/myapp_create.pl controller Reminder
emacs lib/MyApp/Controller/Reminder.pm
package MyApp::Controller::Reminder;
use strict;
use base 'Catalyst::Controller';
use TheSchwartz::Job;
use JSON::XS qw( encode_json );
use Email::Valid ();
use Digest::MD5 qw( md5_hex );
use DBI;

sub index :Path Args(0) {
    my ( $self, $c ) = @_;

    if ( my $email = $c->request->body_params->{email} )
    {
        if ( Email::Valid->address($email) )
        {
            my $job_data = { email => $email };
            my $de_dupe_token = md5_hex(encode_json($job_data));
    
            my $job = TheSchwartz::Job->new(
                                            funcname => "MyApp::Job::Reminder",
                                            uniqkey  => $de_dupe_token,
                                            coalesce => $email,
                                            arg      => $job_data,
                                            );

            my $job_handle = $c->model("TheSchwartz")->insert($job);
            # Returns undef for duplicate jobs.
            $c->stash(job => $job_handle,
                      job_data => $job_data);
        }
        else
        {
            $c->stash(problem => "$email is not a valid address :(");
        }
    }
}

1;

Nota bene: this is not the right way to do a form. You should redirect to GET after a POST. It complicates the example and would require sessions to do here so we omit it. I would never leave it out of a production app. I would also use HTML::FormFu with Catalyst::Controller::HTML::FormFu for any form handling. There are many ways to do form handling it and it’s not too hard to roll your own. You should settle on a standard early though and FormFu is the best available today.

Make a template for the view to find

mkdir root/alloy/reminder
emacs root/alloy/reminder/index.tt
<h1>This is the reminder page</h1>

[%-IF job %]
<h3>Successfully submitted your reminder job!</h3>
[%-ELSIF job_data %]
<h3 class="alert">Did not submit your reminder job. Duplicate?</h3>
[%-END %]

<form id="form" method="post" enctype="application/x-www-form-urlencoded"
 style="margin:0 5% 25px 5%"
 action="[% c.req.uri.path %]">
  <fieldset>
    <legend>Get a reminder?</legend>
    [%-IF problem %]
      <p class="alert">[% problem | html %]</p>
    [%-END %]
    <label>
      Your email address
      <input type="text" name="email" value="[% c.req.param("email") %]" />
      <input type="submit" name="remind" value="Remind me!" />
    </label>
  </fieldset>
</form>

The job runner

I like to keep my application scripts in __path_to(bin)__ instead of __path_to(script)__ with the Catalyst helpers and service runners. That way I know what’s mine and can let Catalyst and its helpers deal with one and I can deal with t’other. So–

mkdir bin
emacs bin/reminder-runner.pl
#!/usr/bin/env perl
use strict;
use warnings;
use TheSchwartz;
use Path::Class::File;
use File::Spec;
use YAML ();
use MyApp::Job::Reminder;

my $self = Path::Class::File->new( File::Spec->rel2abs( $0 ) );
my $app_dir = $self->parent->parent;
my $config_file = Path::Class::File->new($app_dir, "myapp.yml" );

# A bit of custom config riding/hacking to use the application's
# config for the DB.
my $config_data = YAML::LoadFile( $config_file );
my $args = $config_data->{"Model::TheSchwartz"}->{args};
$args->{databases}->[0]->{dsn} =~ s/__path_to\(([^)]+)\)__/ _path_to($1)  /e;

my $client = TheSchwartz->new(%{$args});

$client->can_do("MyApp::Job::Reminder");

$client->work();

sub _path_to {
    Path::Class::File->new($app_dir,+shift);
}

There is a fun trick in there to let the runner use the same configuration the application does. You can see how sane (some of) the Catalyst internals are with the path_to stuff. It was trivial to implement a local version of it so we can use the app’s config without running the app.

Run away, as it were

./script/myapp_server.pl -r -d -p 3000

You’ll also need to start the job runner: bin/reminder-runner.pl. Without it, your reminder jobs will just stack up in the DB.

./bin/reminder-runner.pl

Note that problems with TheSchwartz are difficult to diagnose. If you’re getting stuck, then running your app this way might help–

# tcsh and friends.
setenv DBI_TRACE 2 && ./script/myapp_server.pl -r -d -p 3000
# bash and company.
export DBI_TRACE=2 && ./script/myapp_server.pl -r -d -p 3000

Tomorrow is the last one! Catalyst Model #10: Fixing your legacy code by not fixing it.



digg stumbleupon del.icio.us reddit Fark Technorati Faves

« Catalyst Model #8: Titles in real typefaces on demand with Imager · Catalyst Model #10: Fixing your legacy code by not fixing it »
« 10 Catalyst models in 10 days1 »