Note of MogileFS #08 - Inside MogileFS architecture (1)
はじめに…の前に
これを書き途中で2回もFireFoxが落ちた。しかもはてなのバックアップ機能を有効にしたけど、無意味だった罠。orz...
hatena-mode.elを本気で使いたくなってきた。*1
はじめに
シーケンス図もどきを書こうと思ったんですが、良く考えたら自信を持って書けない事が判明したので、泣く泣くソースを読む事にしました。*2
と言う訳で元の動機がその程度なので余り深く突っ込まない予定…だけど突っ込まざるを得ないか。
特にdatabaseに対して保存しておく情報のやり取りに関しては基本、触れない予定です。
題材としてはファイルの保存、レプリケーション周りを見てみます。
MogileFS::Client->new_file
保存系の操作は最終的にこのメソッドに行き着きます。
この辺りに関してはd:id:ZIGOROu:20061016:1161034490を参照。
sub new_file { my MogileFS::Client $self = shift; return undef if $self->{readonly}; my ($key, $class, $bytes, $opts) = @_; $bytes += 0; $opts ||= {}; my $res = $self->{backend}->do_request ("create_open", { domain => $self->{domain}, class => $class, key => $key, fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one multi_dest => 1, }) or return undef;
backendってのはMogileFS::Backendクラスで、その中のメソッドdo_requestにコマンドcreate_openってのを渡しています。
MogileFS::Backend->do_request
まぁやってる事は想像に難くないんですが、面白そうなので覗いて見ましょう。
sub do_request { my MogileFS::Backend $self = shift; my ($cmd, $args) = @_; _fail("invalid arguments to do_request") unless $cmd && $args; local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL; my $sock = $self->{sock_cache}; my $argstr = _encode_url_string(%$args); my $req = "$cmd $argstr\r\n"; my $reqlen = length($req); my $rv = 0; if ($sock) { # try our cached one, but assume it might be bogus _debug("SOCK: cached = $sock, REQ: $req"); $rv = send($sock, $req, $FLAG_NOSIGNAL); if ($! || ! defined $rv) { # undef is error, but $! may not be populated, we've found undef $self->{sock_cache}; } elsif ($rv != $reqlen) { return _fail("send() didn't return expected length ($rv, not $reqlen)"); } }
local $SIG{'PIPE'} = "IGNORE" なんて初めて見ましたな。このシグナル自体初耳なんですけどw*3
socketハンドルのキャッシュがある場合だけで十分ですけど、
my $argstr = _encode_url_string(%$args); my $req = "$cmd $argstr\r\n";
の部分、
$rv = send($sock, $req, $FLAG_NOSIGNAL);
だけで十分何やってるか分かりますね。
全部リクエストメソッドに乗せちゃってる。
その後のレスポンスのパース部分+を見る限りは、レスポンス自体もレスポンスコード部分で凝縮されてるみたいです。
- error
- ERR
- ok
- OK
こんなフォーマットレスポンスが返ってくるみたいですね。
ちなみに接続先は当たり前ですけど、/etc/mogilefs/mogilefsd.confで指定したconf_portで指定されているmogilefsdに行きます。
mogilefsdコマンド
まず注意しなければいけないのは、このソース中で二回package宣言があり、Mgd, MogileFSって言う二個のネームスペースが割り当てられています。*4
冒頭の起動処理を見てみましょう。
MogileFS::Config->load_config; # don't run as root die "mogilefsd cannot be run as root\n" if $< == 0 && MogileFS->config('user') ne "root"; check_database(); daemonize() if MogileFS->config("daemonize"); MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs')); MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs')); MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs')); MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs')); MogileFS::ProcManager->set_min_workers('monitor' => MogileFS->config('monitor_jobs')); MogileFS::ProcManager->set_min_workers('checker' => MogileFS->config('checker_jobs'));
流れで言えば、
- MogileFS::Config->load_config -- configのロード
- check_database -- database接続確認
- daemonize -- daemon化*5
- MogileFS::ProcManager->set_min_workers -- 各種workerの起動
config, check_database, daemonizeはまぁ良いとしてProgManagerの方を見てみましょう。
実際にはset_min_workersをコールしているんで、そちらを見てみましょう。
MogileFS::ProcManager->set_min_workers
sub set_min_workers { my ($class, $job, $min) = @_; $jobs{$job} ||= [undef, 0]; # [min, current] $jobs{$job}->[0] = $min; # TODO: set allkipsup false, so spawner re-checks? }
むむ、特に気になる処理してませんね。w
続きを読まねばならないのですがIO::Socket::INETとか、Danga::Socketとか使われ出してるんで、
さすがに深追いは禁物っぽぃ。。
以下ちょっと望文的にソースを読む事として、SIGハンドラの定義はまぁ置いといてその後の処理。
# setup server socket to listen for client connections my $server = IO::Socket::INET->new(LocalPort => MogileFS->config('conf_port'), Type => SOCK_STREAM, Proto => 'tcp', Blocking => 0, Reuse => 1, Listen => 10 ) or die "Error creating socket: $@\n"; # accept handler for new clients my $accept_handler = sub { my $csock = $server->accept or return; MogileFS::Connection::Client->new($csock); }; # so children can close these once they fork sub close_listeners { close($server); } # setup Danga::Socket to start handling connections Danga::Socket->DebugLevel( 3 ); Danga::Socket->OtherFds( fileno($server) => $accept_handler );
IO::Socket::INETのnewはコメントにあるとおり、client接続用のlistenポートを確保する。
$accept_handlerの部分は、クライアントからの接続をacceptした時のハンドラでしょう。その後のDanga::Socket->OtherFdsで渡してますね。
この時にMogileFS::Connection::Client->newってやってますが、このクラスを見てみます。
MogileFS::Connection::Client->new
package MogileFS::Connection::Client; use strict; use Danga::Socket (); use base qw{Danga::Socket}; use fields qw{read_buf}; sub new { my $self = shift; $self = fields::new($self) unless ref $self; $self->SUPER::new( @_ ); $self->watch_read(1); return $self; }
と言う訳でDanga::Socketの子クラス扱いですね。。。
と言う訳でちょっと逃げます。w*6
Danga::Socket->OtherFds
ここでリクエスト毎に生成されるであろうDanga::Socketの子クラス、MogileFS::Connection::Clientが
OtherFdsメソッドによって、並列処理対象として登録されます。((http://search.cpan.org/~bradfitz/Danga-Socket-1.53/Socket.pm#CLASS-%3EAddOtherFds(_%5B%25fdmap%5D_)))
mogilefsd
また戻ってさっきの続き
# setup the post event loop callback to spawn jobs, and the timeout Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker); # and now, actually start listening for events eval { print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG; Danga::Socket->EventLoop(); };
各イベントループの最後に処理されるべきコールバックの指定でしょうね。
MogileFS::ProcManager->PostEventLoopChecker
これが重たい…、ソースが長いのと正確に読む気にはなれなかったんで、概略を言えば毎回リクエストを受け付ける度にあるべきmogilefsdプロセス及びその子プロセスの監視を行い、必要ならばforkしたりするみたいですね。
この時のchildプロセス生成がmake_childメソッドになっています。こちらを見てみましょう。
MogileFS::ProcManager->make_child
このメソッドの細かいところはとりあえず置いといて。
大事な部分はここでしょう。
# now call our job function my $class = MogileFS::ProcManager->job_to_class($job) or die "No worker class defined for job '$job'\n"; my $worker = $class->new($childs_ipc); # set our frontend into child mode MogileFS::ProcManager->SetAsChild($worker); $worker->work;
job_to_classメソッドは言うまでも無く、実際のjobを担当するclassを返すutilと考えて良いでしょう。
MogileFS::Worker::JobNameって感じに割り当てられます。
そしてそのworkerを子プロセスとして起動してあげるって流れでしょうね。
例としてClientからのリクエストを受け付けるqueryと言うjobに関してみてみましょう。
MogileFS::Worker::Query->work
大事なのは開いたacceptしたsocketハンドルの読み取り部分のループかな。
while (1) { my $rout; unless (select($rout=$rin, undef, undef, 5.0)) { $self->still_alive; next; } my $newread; my $rv = sysread($psock, $newread, 1024); $buf .= $newread; while ($buf =~ s/^(.+?)\r?\n//) { my $line = $1; $self->validate_dbh; if ($self->process_generic_command(\$line)) { $self->still_alive; # no-op for watchdog } else { $self->process_line(\$line); } } } }
余談だけど、sysreadの読み込みバッファ値とか設定出来ればいいのになぁ。
パフォーマンスチューニングとかで必要になるんじゃないんですかね?*7
読み込みバッファのパースしてる正規表現でgeneric_commandで無い場合はprocess_lineメソッドでコマンドとして扱われるみたいですね。詳しい事は置いといて、その該当部分見てみましょう。
MogileFS::Worker::Query->process_line
結局ここに行き着くはず。
# fallback to normal command handling if ($line =~ /^(\w+)\s*(.*)/) { my ($cmd, $args) = ($1, $2); $cmd = lc($cmd); no strict 'refs'; $self->{querystarttime} = Time::HiRes::gettimeofday(); my $cmd_handler = *{"cmd_$cmd"}{CODE}; if ($cmd_handler) { my $args = decode_url_args(\$args); Mgd::set_force_altzone(1) if $args->{zone} && $args->{zone} eq 'alt'; $cmd_handler->($self, $args); return; } }
ここで実際にqueryとして渡しているclientからのコマンド別に存在するハンドラが割り当てられます。
シンボルリファレンス+型指定グロブってのが熱い。w
でそれを実行するみたいですね。
えーっとそもそも何のコマンド実行するんでしたっけ?(ぇ
create_openか…。orz...
続きは次回!(ぇ