Note of MogileFS #08 - Inside MogileFS architecture (1)
はじめに…の前に
これを書き途中で2回もFireFoxが落ちた。しかもはてなのバックアップ機能を有効にしたけど、無意味だった罠。orz...
hatena-mode.elを本気で使いたくなってきた。*1
はじめに
シーケンス図もどきを書こうと思ったんですが、良く考えたら自信を持って書けない事が判明したので、泣く泣くソースを読む事にしました。*2
と言う訳で元の動機がその程度なので余り深く突っ込まない予定…だけど突っ込まざるを得ないか。
特にdatabaseに対して保存しておく情報のやり取りに関しては基本、触れない予定です。
題材としてはファイルの保存、レプリケーション周りを見てみます。
MogileFS::Client->new_file
保存系の操作は最終的にこのメソッドに行き着きます。
この辺りに関してはd:id:ZIGOROu:20061016:1161034490を参照。
sub new_file {
my MogileFS::Client $self = shift;
return undef if $self->{readonly};
my ($key, $class, $bytes, $opts) = @_;
$bytes += 0;
$opts ||= {};
my $res = $self->{backend}->do_request
("create_open", {
domain => $self->{domain},
class => $class,
key => $key,
fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one
multi_dest => 1,
}) or return undef;backendってのはMogileFS::Backendクラスで、その中のメソッドdo_requestにコマンドcreate_openってのを渡しています。
MogileFS::Backend->do_request
まぁやってる事は想像に難くないんですが、面白そうなので覗いて見ましょう。
sub do_request {
my MogileFS::Backend $self = shift;
my ($cmd, $args) = @_;
_fail("invalid arguments to do_request")
unless $cmd && $args;
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
my $sock = $self->{sock_cache};
my $argstr = _encode_url_string(%$args);
my $req = "$cmd $argstr\r\n";
my $reqlen = length($req);
my $rv = 0;
if ($sock) {
# try our cached one, but assume it might be bogus
_debug("SOCK: cached = $sock, REQ: $req");
$rv = send($sock, $req, $FLAG_NOSIGNAL);
if ($! || ! defined $rv) {
# undef is error, but $! may not be populated, we've found
undef $self->{sock_cache};
} elsif ($rv != $reqlen) {
return _fail("send() didn't return expected length ($rv, not $reqlen)");
}
}local $SIG{'PIPE'} = "IGNORE" なんて初めて見ましたな。このシグナル自体初耳なんですけどw*3
socketハンドルのキャッシュがある場合だけで十分ですけど、
my $argstr = _encode_url_string(%$args); my $req = "$cmd $argstr\r\n";
の部分、
$rv = send($sock, $req, $FLAG_NOSIGNAL);
だけで十分何やってるか分かりますね。
全部リクエストメソッドに乗せちゃってる。
その後のレスポンスのパース部分+を見る限りは、レスポンス自体もレスポンスコード部分で凝縮されてるみたいです。
- error
- ERR
- ok
- OK
こんなフォーマットレスポンスが返ってくるみたいですね。
ちなみに接続先は当たり前ですけど、/etc/mogilefs/mogilefsd.confで指定したconf_portで指定されているmogilefsdに行きます。
mogilefsdコマンド
まず注意しなければいけないのは、このソース中で二回package宣言があり、Mgd, MogileFSって言う二個のネームスペースが割り当てられています。*4
冒頭の起動処理を見てみましょう。
MogileFS::Config->load_config;
# don't run as root
die "mogilefsd cannot be run as root\n"
if $< == 0 && MogileFS->config('user') ne "root";
check_database();
daemonize() if MogileFS->config("daemonize");
MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs'));
MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs'));
MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs'));
MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs'));
MogileFS::ProcManager->set_min_workers('monitor' => MogileFS->config('monitor_jobs'));
MogileFS::ProcManager->set_min_workers('checker' => MogileFS->config('checker_jobs'));流れで言えば、
- MogileFS::Config->load_config -- configのロード
- check_database -- database接続確認
- daemonize -- daemon化*5
- MogileFS::ProcManager->set_min_workers -- 各種workerの起動
config, check_database, daemonizeはまぁ良いとしてProgManagerの方を見てみましょう。
実際にはset_min_workersをコールしているんで、そちらを見てみましょう。
MogileFS::ProcManager->set_min_workers
sub set_min_workers {
my ($class, $job, $min) = @_;
$jobs{$job} ||= [undef, 0]; # [min, current]
$jobs{$job}->[0] = $min;
# TODO: set allkipsup false, so spawner re-checks?
}むむ、特に気になる処理してませんね。w
続きを読まねばならないのですがIO::Socket::INETとか、Danga::Socketとか使われ出してるんで、
さすがに深追いは禁物っぽぃ。。
以下ちょっと望文的にソースを読む事として、SIGハンドラの定義はまぁ置いといてその後の処理。
# setup server socket to listen for client connections
my $server = IO::Socket::INET->new(LocalPort => MogileFS->config('conf_port'),
Type => SOCK_STREAM,
Proto => 'tcp',
Blocking => 0,
Reuse => 1,
Listen => 10 )
or die "Error creating socket: $@\n";
# accept handler for new clients
my $accept_handler = sub {
my $csock = $server->accept
or return;
MogileFS::Connection::Client->new($csock);
};
# so children can close these once they fork
sub close_listeners {
close($server);
}
# setup Danga::Socket to start handling connections
Danga::Socket->DebugLevel( 3 );
Danga::Socket->OtherFds( fileno($server) => $accept_handler );IO::Socket::INETのnewはコメントにあるとおり、client接続用のlistenポートを確保する。
$accept_handlerの部分は、クライアントからの接続をacceptした時のハンドラでしょう。その後のDanga::Socket->OtherFdsで渡してますね。
この時にMogileFS::Connection::Client->newってやってますが、このクラスを見てみます。
MogileFS::Connection::Client->new
package MogileFS::Connection::Client;
use strict;
use Danga::Socket ();
use base qw{Danga::Socket};
use fields qw{read_buf};
sub new {
my $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ );
$self->watch_read(1);
return $self;
}と言う訳でDanga::Socketの子クラス扱いですね。。。
と言う訳でちょっと逃げます。w*6
Danga::Socket->OtherFds
ここでリクエスト毎に生成されるであろうDanga::Socketの子クラス、MogileFS::Connection::Clientが
OtherFdsメソッドによって、並列処理対象として登録されます。((http://search.cpan.org/~bradfitz/Danga-Socket-1.53/Socket.pm#CLASS-%3EAddOtherFds(_%5B%25fdmap%5D_)))
mogilefsd
また戻ってさっきの続き
# setup the post event loop callback to spawn jobs, and the timeout
Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds
Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker);
# and now, actually start listening for events
eval {
print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
Danga::Socket->EventLoop();
};各イベントループの最後に処理されるべきコールバックの指定でしょうね。
MogileFS::ProcManager->PostEventLoopChecker
これが重たい…、ソースが長いのと正確に読む気にはなれなかったんで、概略を言えば毎回リクエストを受け付ける度にあるべきmogilefsdプロセス及びその子プロセスの監視を行い、必要ならばforkしたりするみたいですね。
この時のchildプロセス生成がmake_childメソッドになっています。こちらを見てみましょう。
MogileFS::ProcManager->make_child
このメソッドの細かいところはとりあえず置いといて。
大事な部分はここでしょう。
# now call our job function
my $class = MogileFS::ProcManager->job_to_class($job)
or die "No worker class defined for job '$job'\n";
my $worker = $class->new($childs_ipc);
# set our frontend into child mode
MogileFS::ProcManager->SetAsChild($worker);
$worker->work;job_to_classメソッドは言うまでも無く、実際のjobを担当するclassを返すutilと考えて良いでしょう。
MogileFS::Worker::JobNameって感じに割り当てられます。
そしてそのworkerを子プロセスとして起動してあげるって流れでしょうね。
例としてClientからのリクエストを受け付けるqueryと言うjobに関してみてみましょう。
MogileFS::Worker::Query->work
大事なのは開いたacceptしたsocketハンドルの読み取り部分のループかな。
while (1) {
my $rout;
unless (select($rout=$rin, undef, undef, 5.0)) {
$self->still_alive;
next;
}
my $newread;
my $rv = sysread($psock, $newread, 1024);
$buf .= $newread;
while ($buf =~ s/^(.+?)\r?\n//) {
my $line = $1;
$self->validate_dbh;
if ($self->process_generic_command(\$line)) {
$self->still_alive; # no-op for watchdog
} else {
$self->process_line(\$line);
}
}
}
}余談だけど、sysreadの読み込みバッファ値とか設定出来ればいいのになぁ。
パフォーマンスチューニングとかで必要になるんじゃないんですかね?*7
読み込みバッファのパースしてる正規表現でgeneric_commandで無い場合はprocess_lineメソッドでコマンドとして扱われるみたいですね。詳しい事は置いといて、その該当部分見てみましょう。
MogileFS::Worker::Query->process_line
結局ここに行き着くはず。
# fallback to normal command handling
if ($line =~ /^(\w+)\s*(.*)/) {
my ($cmd, $args) = ($1, $2);
$cmd = lc($cmd);
no strict 'refs';
$self->{querystarttime} = Time::HiRes::gettimeofday();
my $cmd_handler = *{"cmd_$cmd"}{CODE};
if ($cmd_handler) {
my $args = decode_url_args(\$args);
Mgd::set_force_altzone(1) if $args->{zone} && $args->{zone} eq 'alt';
$cmd_handler->($self, $args);
return;
}
}ここで実際にqueryとして渡しているclientからのコマンド別に存在するハンドラが割り当てられます。
シンボルリファレンス+型指定グロブってのが熱い。w
でそれを実行するみたいですね。
えーっとそもそも何のコマンド実行するんでしたっけ?(ぇ
create_openか…。orz...
続きは次回!(ぇ