Ceph osd request分析

OSD Message示例分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
osd_op(client.4813.0:510 default.4161.780__shadow__99999.txt_0 [write 2097152~524288] 5.ba5249f6 ack+ondisk+write+known_if_redirected e130) v4

class MOSDOp : public Message {
...
void print(ostream& out) const {
out << "osd_op(" << get_reqid(); client.4813.0:510
out << " ";
if (!oloc.nspace.empty())
out << oloc.nspace << "/";
out << oid; default.4161.780__shadow__99999.txt_0

#if 0
out << " ";
if (may_read())
out << "r";
if (may_write())
out << "w";
#endif
if (snapid != CEPH_NOSNAP)
out << "@" << snapid;

if (oloc.key.size())
out << " " << oloc;

out << " " << ops; [write 2097152~524288]
out << " " << pgid; 5.ba5249f6
if (is_retry_attempt())
out << " RETRY=" << get_retry_attempt();
if (reassert_version != eversion_t())
out << " reassert_version=" << reassert_version;
if (get_snap_seq())
out << " snapc " << get_snap_seq() << "=" << snaps;
out << " " << ceph_osd_flag_string(get_flags()); ack+ondisk+write+known_if_redirected
out << " e" << osdmap_epoch; e130
out << ")";
}
}

OSD Message处理流程

Message在Primary OSD的处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
void OSD::enqueue_op(PG *pg, OpRequestRef& op)
void PG::queue_op(OpRequestRef& op) -- { osd->op_wq.queue(make_pair(PGRef(this), op));}

void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
void OSD::dequeue_op(PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle)
void ReplicatedPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle)
void ReplicatedPG::do_op(OpRequestRef& op) // case CEPH_MSG_OSD_OP
void ReplicatedPG::execute_ctx(OpContext *ctx)
|-- int ReplicatedPG::prepare_transaction(OpContext *ctx)
| int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|-- void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
| void ReplicatedBackend::submit_transaction()
| |-- void ReplicatedBackend::issue_op()
| | |-- void ReplicatedPG::send_message_osd_cluster() // send write data to replica osds
| | |-- void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
| |-- parent->queue_transaction(op_t, op.op);
| | |-- int FileStore::queue_transactions() // 没有journal或需要同时写journal和disk时,会调用 queue_op(osr, o);
| | |-- void JournalingObjectStore::_op_journal_transactions()
| | |-- void FileJournal::submit_entry() //添加到 deque<write_item> writeq;
|-- void ReplicatedPG::eval_repop(RepGather *repop)

异步journal写数据

1
FileJournal.Writer.entry() -> FileJournal::write_thread_entry() -> FileJournal::do_write() -> FileJournal::write_bl()

Message在Replica OSD的处理流程

1
2
3
4
5
6
7
8
9
10
11
12
bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
void OSD::enqueue_op(PG *pg, OpRequestRef& op)
void PG::queue_op(OpRequestRef& op) -- { osd->op_wq.queue(make_pair(PGRef(this), op));}

void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
void OSD::dequeue_op(PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle)
void ReplicatedPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle)
bool ReplicatedBackend::handle_message()
void ReplicatedBackend::sub_op_modify(OpRequestRef op)
|-- parent->queue_transaction(op_t, op.op);
| |-- int FileStore::queue_transactions()

回调函数

在把transaction加入队列前,会初始化transaction的几个operation回调函数:

1
2
3
4
class ObjectStore:
list<Context *> on_applied;
list<Context *> on_commit;
list<Context *> on_applied_sync;

Primary OSD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
— 发往replica的operation,处理replica reply是否全部返回
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
repop->obc,
repop->ctx->clone_obc,
unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());

— 发送到本OSD的operation,处理写本OSD的reply是否返回
op_t->register_on_applied_sync(on_local_applied_sync);
op_t->register_on_applied(
parent->bless_context(
new C_OSD_OnOpApplied(this, &op)));
op_t->register_on_applied(
new ObjectStore::C_DeleteTransaction(op_t));
op_t->register_on_commit(
parent->bless_context(
new C_OSD_OnOpCommit(this, &op)));

Replica OSD

1
2
3
4
5
6
7
— 发送到本OSD的operation,处理写本OSD的reply是否返回
rm->localt.register_on_commit(
parent->bless_context(
new C_OSD_RepModifyCommit(this, rm)));
rm->localt.register_on_applied(
parent->bless_context(
new C_OSD_RepModifyApply(this, rm)));

Finisher类

Finisher类是在src/common中定义的一个专门查看操作是否结束的一个类。

在这个类里面拥有一个线程finisher_thread和一个类型为Context指针的队列finisher_queue。当一个操作线程完成自己的操作后,会将Context类型对象送入队列。此时finisher_thread线程循环监视着自己的finisher_queue队列,当发现了有新进入的Context时,会调用这个Context::complete函数,这个函数则会调用到Context子类自己实现的finish函数,来处理操作完成后的后续工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Finisher {
CephContext *cct;
...
vector<Context*> finisher_queue;
...
void *finisher_thread_entry();
struct FinisherThread : public Thread {
Finisher *fin;
FinisherThread(Finisher *f) : fin(f) {}
void* entry() {
return (void*)fin->finisher_thread_entry();
}
} finisher_thread;
...
}

void *Finisher::finisher_thread_entry()
{
...
while (!finisher_stop) {
while (!finisher_queue.empty()) {
vector<Context*> ls;
ls.swap(finisher_queue);
...
for (vector<Context*>::iterator p = ls.begin();
p != ls.end();
++p) {
if (*p) {
(*p)->complete(0); // 调用Context子类实现的finish函数
}...
}
...
}
...
}

IO相关finisher

跟IO相关的finisher有三个,journal有一个,filestore有两个:

1
2
3
4
5
6
7
8
9
10
11
12
13
class JournalingObjectStore : public ObjectStore {
protected:
Journal *journal;
Finisher finisher; // 负责journal写完后的通知处理
...
}

class FileStore : public JournalingObjectStore,
public md_config_obs_t
{
Finisher ondisk_finisher; // 负责data写到journal后commit的通知处理
Finisher op_finisher; // 负责data写到filestore后apply的通知处理
}

写数据到FileStore和IO回调

Primary OSD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
TrackedOpRef osd_op,
ThreadPool::TPHandle *handle)
{
_op_journal_transactions(o->tls, o->op,
new C_JournaledAhead(this, osr, o, ondisk),
osd_op);
...
}

struct C_JournaledAhead : public Context {
FileStore *fs;
FileStore::OpSequencer *osr;
FileStore::Op *o;
Context *ondisk;

C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
fs(f), osr(os), o(o), ondisk(ondisk) { }
void finish(int r) {
fs->_journaled_ahead(osr, o, ondisk);
}
};

在journal commit后会通过JournalingObjectStore类的 finisher 来通知调用到C_JournaledAhead.finish()函数。

_journal_ahead函数里会把op添加到filestore的写队列里和FileStore类的ondisk_finisher队列里。

1
2
3
void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
|-- queue_op(osr, o); — void FileStore::queue_op(OpSequencer *osr, Op *o) {op_wq.queue(osr);},把op添加到FileStore的op_wq队列中。
|-- ondisk_finisher.queue(to_queue); — 这里通过ondisk_finisher来通知上层IO已经commit到journal,具体会调用到C_OSD_OnOpCommit.finish()函数。

FileStore workqueue处理过程:

1
2
3
4
5
6
7
8
9
10
11
12
FileStore.OpWQ._process {store->_do_op(osr, handle);}
void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
{
...
apply_manager.op_apply_start(o->op);
int r = _do_transactions(o->tls, o->op, &handle);
apply_manager.op_apply_finish(o->op);
...
}

FileStore::_do_transactions() {r = _do_transaction(**p, op_seq, trans_num, handle);}
unsigned FileStore::_do_transaction() {r = _write(cid, oid, off, len, bl, replica);// case write}

C_OSD_OnOpCommit.finish()函数调用到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ReplicatedBackend::op_commit(InProgressOp *op)
{
...
op->waiting_for_commit.erase(get_parent()->whoami_shard()); // 把waiting_for_commit set里对应的信息清除

if (op->waiting_for_commit.empty()) { // 为空是表明所有OSD写请求都已经返回
op->on_commit->complete(0); // 会调用到C_OSD_RepOpCommit.finish()
op->on_commit = 0;
}
if (op->done()) {
assert(!op->on_commit && !op->on_applied);
in_progress_ops.erase(op->tid);
}
}

Replica OSD

从OSD写请求返回后处理流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
{
...
if (in_progress_ops.count(rep_tid)) {
ip_op.waiting_for_applied.erase(from);
...
if (ip_op.waiting_for_applied.empty() &&
ip_op.on_applied) {
ip_op.on_applied->complete(0); // C_OSD_RepOpApplied.finish()
ip_op.on_applied = 0;
}
if (ip_op.waiting_for_commit.empty() &&
ip_op.on_commit) {
ip_op.on_commit->complete(0); // C_OSD_RepOpCommit.finish()
ip_op.on_commit= 0;
}
if (ip_op.done()) {
assert(!ip_op.on_commit && !ip_op.on_applied);
in_progress_ops.erase(iter);
}
}
...
}

C_OSD_RepOpCommit.finish()函数调用到:

1
2
3
4
5
6
7
8
9
10
void ReplicatedPG::repop_all_committed(RepGather *repop)
{
...
repop->all_committed = true;

if (!repop->rep_aborted) {
...
eval_repop(repop); // 返回reply给client端
}
}

sync数据到disk

filestore数据sync到disk(调底层fs的sync接口):

1
2
3
4
5
6
7
8
9
10
11
12
13
class FileStore : public JournalingObjectStore,
public md_config_obs_t
{
struct SyncThread : public Thread {
FileStore *fs;
SyncThread(FileStore *f) : fs(f) {}
void *entry() {
fs->sync_entry();
return 0;
}
} sync_thread;
...
}

FileStore::mount方法中,会创建sync线程sync_thread.create(), 该线程的入口函数为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void FileStore::sync_entry()
{
utime_t max_interval;
max_interval.set_from_double(m_filestore_max_sync_interval); // 参数可配
utime_t min_interval;
min_interval.set_from_double(m_filestore_min_sync_interval); // 参数可配
...
sync_cond.WaitInterval(g_ceph_context, lock, max_interval); // 等待interval时间
...
if (apply_manager.commit_start()) {
...
apply_manager.commit_started();
int err = backend->syncfs();
err = write_op_seq(op_fd, cp);
err = ::fsync(op_fd);
apply_manager.commit_finish();
...
}

主要通过sync函数,将FileStore打开的文件进行数据的flush磁盘操作。

1
2
3
4
5
6
7
8
9
10
int GenericFileStoreBackend::syncfs()
{
...
if (m_filestore_fsync_flushes_journal_data) {
ret = ::fsync(get_op_fd());
} else {
ret = sync_filesystem(get_current_fd());
}
return ret;
}

commit_finish会把commit_waiters加入finisher队列,由finisher函数通知commit完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void JournalingObjectStore::ApplyManager::commit_finish()
{
...
if (journal)
journal->committed_thru(committing_seq);

committed_seq = committing_seq;

map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
while (p != commit_waiters.end() &&
p->first <= committing_seq) {
finisher.queue(p->second);
commit_waiters.erase(p++);
}
}

何时唤醒sync线程:

1
2
3
4
5
6
7
8
9
— sync_cond.Signal();
void FileStore::_start_sync() <- unsigned FileStore::_do_transaction() {case Transaction::OP_STARTSYNC:}
{
if (!journal) {
sync_cond.Signal();
}
}
void FileStore::do_force_sync() <- int FileStore::umount()
void FileStore::start_sync(Context *onsafe) <- void FileStore::sync()

FileJournal的do_sync_cond也指向FileStore的sync_cond:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int FileStore::open_journal()
{
...
journal = new FileJournal(fsid, &finisher, &sync_cond, journalpath.c_str(),
m_journal_dio, m_journal_aio, m_journal_force_aio);
...
}

FileJournal.Writer.entry()
-> FileJournal::write_thread_entry()
-> FileJournal::prepare_multi_write()
-> FileJournal::prepare_single_write()
-> FileJournal::check_for_full()
{
// passing half full mark, triggering commit
do_sync_cond->SloppySignal(); // initiate a real commit so we can trim
}
支持原创