日向夏特殊応援部隊

俺様向けメモ

Note of MogileFS #08 - Inside MogileFS architecture (1)

はじめに…の前に

これを書き途中で2回もFireFoxが落ちた。しかもはてなのバックアップ機能を有効にしたけど、無意味だった罠。orz...
hatena-mode.elを本気で使いたくなってきた。*1

はじめに

シーケンス図もどきを書こうと思ったんですが、良く考えたら自信を持って書けない事が判明したので、泣く泣くソースを読む事にしました。*2


と言う訳で元の動機がその程度なので余り深く突っ込まない予定…だけど突っ込まざるを得ないか。


特にdatabaseに対して保存しておく情報のやり取りに関しては基本、触れない予定です。


題材としてはファイルの保存、レプリケーション周りを見てみます。

MogileFS::Client->new_file

保存系の操作は最終的にこのメソッドに行き着きます。
この辺りに関してはd:id:ZIGOROu:20061016:1161034490を参照。

sub new_file {
    my MogileFS::Client $self = shift;
    return undef if $self->{readonly};

    my ($key, $class, $bytes, $opts) = @_;
    $bytes += 0;
    $opts ||= {};

    my $res = $self->{backend}->do_request
        ("create_open", {
            domain => $self->{domain},
            class  => $class,
            key    => $key,
            fid    => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one
            multi_dest => 1,
        }) or return undef;

backendってのはMogileFS::Backendクラスで、その中のメソッドdo_requestにコマンドcreate_openってのを渡しています。

MogileFS::Backend->do_request

まぁやってる事は想像に難くないんですが、面白そうなので覗いて見ましょう。

sub do_request {
    my MogileFS::Backend $self = shift;
    my ($cmd, $args) = @_;

    _fail("invalid arguments to do_request")
        unless $cmd && $args;

    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;

    my $sock = $self->{sock_cache};
    my $argstr = _encode_url_string(%$args);
    my $req = "$cmd $argstr\r\n";
    my $reqlen = length($req);
    my $rv = 0;

    if ($sock) {
        # try our cached one, but assume it might be bogus
        _debug("SOCK: cached = $sock, REQ: $req");
        $rv = send($sock, $req, $FLAG_NOSIGNAL);
        if ($! || ! defined $rv) {
            # undef is error, but $! may not be populated, we've found
            undef $self->{sock_cache};
        } elsif ($rv != $reqlen) {
            return _fail("send() didn't return expected length ($rv, not $reqlen)");
        }
    }

local $SIG{'PIPE'} = "IGNORE" なんて初めて見ましたな。このシグナル自体初耳なんですけどw*3


socketハンドルのキャッシュがある場合だけで十分ですけど、

my $argstr = _encode_url_string(%$args);
my $req = "$cmd $argstr\r\n";

の部分、

$rv = send($sock, $req, $FLAG_NOSIGNAL);

だけで十分何やってるか分かりますね。
全部リクエストメソッドに乗せちゃってる。


その後のレスポンスのパース部分+を見る限りは、レスポンス自体もレスポンスコード部分で凝縮されてるみたいです。

error
ERR
ok
OK

こんなフォーマットレスポンスが返ってくるみたいですね。


ちなみに接続先は当たり前ですけど、/etc/mogilefs/mogilefsd.confで指定したconf_portで指定されているmogilefsdに行きます。

mogilefsdコマンド

まず注意しなければいけないのは、このソース中で二回package宣言があり、Mgd, MogileFSって言う二個のネームスペースが割り当てられています。*4


冒頭の起動処理を見てみましょう。

MogileFS::Config->load_config;

# don't run as root
die "mogilefsd cannot be run as root\n"
    if $< == 0 && MogileFS->config('user') ne "root";

check_database();
daemonize() if MogileFS->config("daemonize");

MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs'));
MogileFS::ProcManager->set_min_workers('delete'      => MogileFS->config('delete_jobs'));
MogileFS::ProcManager->set_min_workers('replicate'   => MogileFS->config('replicate_jobs'));
MogileFS::ProcManager->set_min_workers('reaper'      => MogileFS->config('reaper_jobs'));
MogileFS::ProcManager->set_min_workers('monitor'     => MogileFS->config('monitor_jobs'));
MogileFS::ProcManager->set_min_workers('checker'     => MogileFS->config('checker_jobs'));

流れで言えば、

  1. MogileFS::Config->load_config -- configのロード
  2. check_database -- database接続確認
  3. daemonize -- daemon化*5
  4. MogileFS::ProcManager->set_min_workers -- 各種workerの起動

config, check_database, daemonizeはまぁ良いとしてProgManagerの方を見てみましょう。
実際にはset_min_workersをコールしているんで、そちらを見てみましょう。

MogileFS::ProcManager->set_min_workers

sub set_min_workers {
    my ($class, $job, $min) = @_;
    $jobs{$job} ||= [undef, 0];   # [min, current]
    $jobs{$job}->[0] = $min;

    # TODO: set allkipsup false, so spawner re-checks?
}

むむ、特に気になる処理してませんね。w


続きを読まねばならないのですがIO::Socket::INETとか、Danga::Socketとか使われ出してるんで、
さすがに深追いは禁物っぽぃ。。


以下ちょっと望文的にソースを読む事として、SIGハンドラの定義はまぁ置いといてその後の処理。

# setup server socket to listen for client connections
my $server = IO::Socket::INET->new(LocalPort => MogileFS->config('conf_port'),
                                   Type      => SOCK_STREAM,
                                   Proto     => 'tcp',
                                   Blocking  => 0,
                                   Reuse     => 1,
                                   Listen    => 10 )
    or die "Error creating socket: $@\n";

# accept handler for new clients
my $accept_handler = sub {
    my $csock = $server->accept
        or return;
    MogileFS::Connection::Client->new($csock);
};

# so children can close these once they fork
sub close_listeners {
    close($server);
}

# setup Danga::Socket to start handling connections
Danga::Socket->DebugLevel( 3 );
Danga::Socket->OtherFds( fileno($server)  => $accept_handler );

IO::Socket::INETのnewはコメントにあるとおり、client接続用のlistenポートを確保する。
$accept_handlerの部分は、クライアントからの接続をacceptした時のハンドラでしょう。その後のDanga::Socket->OtherFdsで渡してますね。


この時にMogileFS::Connection::Client->newってやってますが、このクラスを見てみます。

MogileFS::Connection::Client->new

package MogileFS::Connection::Client;

use strict;
use Danga::Socket ();
use base qw{Danga::Socket};

use fields qw{read_buf};

sub new {
    my $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );
    $self->watch_read(1);
    return $self;
}

と言う訳でDanga::Socketの子クラス扱いですね。。。
と言う訳でちょっと逃げます。w*6

Danga::Socket->OtherFds

ここでリクエスト毎に生成されるであろうDanga::Socketの子クラス、MogileFS::Connection::Clientが
OtherFdsメソッドによって、並列処理対象として登録されます。((http://search.cpan.org/~bradfitz/Danga-Socket-1.53/Socket.pm#CLASS-%3EAddOtherFds(_%5B%25fdmap%5D_)))

mogilefsd

また戻ってさっきの続き

# setup the post event loop callback to spawn jobs, and the timeout
Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds
Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker);

# and now, actually start listening for events
eval {
    print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
    Danga::Socket->EventLoop();
};

各イベントループの最後に処理されるべきコールバックの指定でしょうね。

MogileFS::ProcManager->PostEventLoopChecker

これが重たい…、ソースが長いのと正確に読む気にはなれなかったんで、概略を言えば毎回リクエストを受け付ける度にあるべきmogilefsdプロセス及びその子プロセスの監視を行い、必要ならばforkしたりするみたいですね。


この時のchildプロセス生成がmake_childメソッドになっています。こちらを見てみましょう。

MogileFS::ProcManager->make_child

このメソッドの細かいところはとりあえず置いといて。
大事な部分はここでしょう。

    # now call our job function
    my $class = MogileFS::ProcManager->job_to_class($job)
        or die "No worker class defined for job '$job'\n";
    my $worker = $class->new($childs_ipc);

    # set our frontend into child mode
    MogileFS::ProcManager->SetAsChild($worker);

    $worker->work;

job_to_classメソッドは言うまでも無く、実際のjobを担当するclassを返すutilと考えて良いでしょう。
MogileFS::Worker::JobNameって感じに割り当てられます。


そしてそのworkerを子プロセスとして起動してあげるって流れでしょうね。
例としてClientからのリクエストを受け付けるqueryと言うjobに関してみてみましょう。

MogileFS::Worker::Query->work

大事なのは開いたacceptしたsocketハンドルの読み取り部分のループかな。

while (1) {
        my $rout;
        unless (select($rout=$rin, undef, undef, 5.0)) {
            $self->still_alive;
            next;
        }

        my $newread;
        my $rv = sysread($psock, $newread, 1024);
        $buf .= $newread;

        while ($buf =~ s/^(.+?)\r?\n//) {
            my $line = $1;
            $self->validate_dbh;
            if ($self->process_generic_command(\$line)) {
                $self->still_alive;  # no-op for watchdog
            } else {
                $self->process_line(\$line);
            }
        }
    }
}

余談だけど、sysreadの読み込みバッファ値とか設定出来ればいいのになぁ。
パフォーマンスチューニングとかで必要になるんじゃないんですかね?*7


読み込みバッファのパースしてる正規表現でgeneric_commandで無い場合はprocess_lineメソッドでコマンドとして扱われるみたいですね。詳しい事は置いといて、その該当部分見てみましょう。

MogileFS::Worker::Query->process_line

結局ここに行き着くはず。

    # fallback to normal command handling
    if ($line =~ /^(\w+)\s*(.*)/) {
        my ($cmd, $args) = ($1, $2);
        $cmd = lc($cmd);

        no strict 'refs';
        $self->{querystarttime} = Time::HiRes::gettimeofday();
        my $cmd_handler = *{"cmd_$cmd"}{CODE};
        if ($cmd_handler) {
            my $args = decode_url_args(\$args);
            Mgd::set_force_altzone(1) if $args->{zone} && $args->{zone} eq 'alt';
            $cmd_handler->($self, $args);
            return;
        }
    }

ここで実際にqueryとして渡しているclientからのコマンド別に存在するハンドラが割り当てられます。
シンボルリファレンス+型指定グロブってのが熱い。w


でそれを実行するみたいですね。
えーっとそもそも何のコマンド実行するんでしたっけ?(ぇ
create_openか…。orz...


続きは次回!(ぇ

*1:なんかMeadowで上手く動かないんだよなぁ。。。

*2:このペースだとマジで資料作成終わらん…

*3:渡す相手の無いPIPE使った時…らしいです。via http://www.linux.or.jp/JM/html/LDP_man-pages/man7/signal.7.html

*4:これ外に出して欲しいなぁ。。。

*5:daemonizeオプションが有効な時のみ

*6:読み始めたけど諦めました…

*7:識者の意見求む。