Jonkで作るおれおれJobQueueフレームワーク

2010-12-02

こんにちわこんにちわ!YAPC::Asia 2010でベストスピーカー賞(次点)を頂いた nekokak です。
今回は拙作 Jonk でオレオレJobQueueフレームワークを作るお話をします。

はじめに

なぜJonkをつくったのかについては、JobQueueManagerについて
ご覧頂ければと思います。
簡単に言うと、やりたい事って案件によりけりだから、自分でその案件にあったJobQueueフレームワークつくれたほうがいいよね!
です。

install

$ cpanm Jonk

だけです。依存モジュールはTry::Tinyというモジュール一個だけなので、
全く問題無く入るでしょう。
依存モジュールが殆ど無いのがJonkの売りの一つです。
JobQueueを管理するためだけに、よくわからないORMがインストールされてしまったり
よく分からないモジュールがインストールされるのってやじゃないですか:)

Jonkが依存しているTry::TinyはPlackでも利用されている由緒正しきモジュールなのでご安心下さい。

setup

JonkはJobの管理でRDBMSを利用します。
対応しているRDBMSはMySQL/PostgreSQL/SQLiteの3種類です。

Jonk.pmのPOD中のSCHEMAセクションに各RDBMS毎のschemaがあるので、それをつかってDBを作ってください。
database名などはご自由に。

以上でJonkを使う準備が整いましたので、さっそオレオレJobQueueフレームワークを作りましょう。

簡単クローラーを作る

よくある単純なクローラーを作ってみます。
超単純なので実際に作る場合は色々考えて作ってください。

fetchしてくるurlをenqueueする

まずfetchしてくるURLを登録するクライアントを作成します。

$ cat ./client.pl
#! perl
use strict;
use warnings;
use DBI;
use Jonk::Client;

my $url = shift or die '$0 <url>';
my $dbh = DBI->connect('dbi:SQLite:./sample.db','','');
my $client = Jonk::Client->new($dbh);
$client->enqueue('MyFetcher', $url);

クライアントスクリプトとしてはこんな感じで

$ perl ./client.pl http://example.com/

とするだけで、http://example.com/
をクロール対象とするJobをenqueueすることができます。

worker起動スクリプトを作成する

workerのモデルとしては、forkモデルで複数プロセスでfetchしてくる感じです。
forkでのプロセス管理でParallel::Preforkを利用します。

$ cat ./worker.pl
use strict;
use warnings;
use Parallel::Prefork;
use DBI;
use Jonk::Worker;
use MyFetcher;

my $pm = Parallel::Prefork->new({
    max_workers       => 10,
    trap_signals      => {
        TERM => 'TERM',
        HUP  => 'TERM',
    },
});

while ($pm->signal_received ne 'TERM') {
    $pm->start and next;

    my $dbh = DBI->connect('dbi:SQLite:./sample.db','','');
    my $jonk = Jonk::Worker->new($dbh => {functions => [qw/MyFetcher/]});

    while (1) {
        if (my $job = $jonk->dequeue) {
            MyFetcher->work($job);
        } else {
            sleep(3); # wait for 3 sec.
        }
    }

    $pm->finish;
}

$pm->wait_all_children();

worker起動スクリプトとしてはこんな感じです。
これで10子プロセスが生成され、10人のworkerが随時Jonkからjobを取り出し
MyFetcherにJobの情報を渡して処理させます。

MyFetcherはこんなかんじでしょうか

package MyFetcher;
use strict;
use warnings;
use LWP::UserAgent;

sub work {
    my ($class, $job) = @_;
    my $ua = LWP::UserAgent->new;
    my $res = $ua->get($job->{arg});

    print $$;
    if ($res->is_success) {
        print $res->decoded_content;  # or whatever
    } else {
        print $res->status_line;
    }
}

1;

簡単でしょ?

今回は超簡単なサンプルスクリプトでご説明しました。
サンプルコードも
https://github.com/perl-users-jp/perl-advent-calendar/tree/master/2010/hacker/jonk_sample/
に置いておきましたのでどうぞ。

このようにJonkのをつかえばオレオレJobQueueフレームワークを作成することは簡単です。
これで既存のJobQueueエンジンに無理やり合わせて変なコードを書かないでよくなり、
案件に応じて柔軟なフレームワークを作成してみましょう!


明日はkazeburoさんのターンです!