原链接
http://blog.csdn.net/litianze99/article/details/51489610
OSD Messenger通过网络层通过OSD的ms_fast_dispatch()方法把接收到的消息分发该osd实例来处理。 OSD::ms_fast_dispatch(Message *m) 1.检测OSDService服务是否已经停止,如果是则丢弃该消息(该对象的引用计数减1 nref.dec(),如果引用计数为0则删除该对象)。 2.OpRequestRef op = op_tracker.create_request<OpRequest>(m);把message消息封装到OpRequest对象中,该对象中同时也包含OpTracker,OpTracker负责跟踪OpRequest对象, 伴随其整改生命周期,但是该OpTracker的功能可以通过配置来启用/禁止。可用于op处理的性能提升。 3. OSDMapRef nextmap = service.get_nextmap_reserved();从OSDService中获取next_osdmap!!!的引用。(next_osdmap - pre_published map that is about to be published. osdmap - current published map) 4. Session *session = static_cast<Session*>(m->get_connection()->get_priv());获取当前连接的session!!!。 5.session->waiting_on_map.push_back(op); 把op放入waiting_on_map列表中。 6.分发堵塞在该session上的所有op。 OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) 。 OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap) 如果op携带的osdmap的epoch不大于osd的osdmap epoch就根据OpRequest的类型来作出相应的处理。 针对该op做大量的检测类工作。(OSD) OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)依客户端的请求为例 1.如该op的客户端失联,则放弃处理该op。 2.判断该请求message带的object的名字是否合法。 3.判断op的请求源是否合法 4.判断是否需要更新client的map。 5.从消息中获取对应的pg,及pg所对应的pool 6.如果该操作是一个写操作,就进行一系列的检查工作,pool是否满、对象时否过大、当前集群是否满了。 7.oadmap->raw_pg_to_pg,使用pg_pool_t的raw_pg_to_pg,将pg映射到真是的pg上。 8.判断该pg对应的是否是ec pool,若是,其pg对应的acting set中有其对应的primary osd则,返回true,否则是false;如果不是ec pool则返回true;以上都将pg_t转成spg_t。 9.重新获取message epoch对应的OSDMap实例,若不能获得,或者在该osdmap中若没有该op对应pg所在的pool、或者当前OSD不能处理该op,则终止处理该op。 10.再次判断是否有pg 或者pool来对应该op。 11. PG *pg = get_pg_or_queue_for_pg(pgid, op);获取该pgid对应的PG实例。 12.PG::queue_op(OpRequestRef& op),最终把op放入到OSDService::op_wq队列中。 在PG层面上为OP做检查。 PG::queue_op(OpRequestRef& op) 1.在PG层面上如果PG::waiting_for_map不为空说明当前的OSDMap不可用,把op放入到waiting_for_map队列中,不能继续处理。 2.op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), op) 判断当前pg的epoch是否大于op的epoch,若不大于则将op放入到PG::waiting_for_map队列中等待新的pg epoch。 总之在OSD或在PG层面上op的epoch就不能大于OSD或者PG的epoch,否则都要放入相应的waiting_for_map中等待。 3.osd->op_wq.queue(make_pair(PGRef(this), op)); 把op放入OSDService::op_wq队列中。 session结束。 op_wq是一个ShardedWQ实例,会被对应的线程池中的线程处理,具体处理过程在 OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) 1.op->run(osd, item.first, tp_handle); osd->dequeue_op(pg, op, handle); 在worker thread中该方法被调用。 OSD::dequeue_op( PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle) 1.如果op->send_map_update 为true则调用service.share_map(...) 通知对方更新osdmap。 2.pg->do_request(op, handle);==>ReplicatedPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle) 进入ReplicatedPG对op的处理,(主要在ReplicatedPG层面上为op做了一些检查工作) ReplicatedPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle) 1.can_discard_request(op)判断该op是否可以丢弃。 2.判断当前pg是否在flush,如果是则将op加入到waiting_for_peered队列中等待。 3.如果PG还灭有peered,检查pgbackend是否能够对该op处理,若不行就将其放入waiting_for_peered队列中,否则就进入Replicated::handle_message()处理。 4.检查pgbacnehd能否处理该请求,如果能处理,就处理。 5.根据op的类型,选择对应的处理方法处理op。假设:CEPH_MSG_OSD_OP : do_op(op) do_op主要检查相关的对象的状态是否正常,并获取ObjectContext,OpContext相关的上下文信息 ReplicatedPG::do_op(OpRequestRef& op) 1.如果是incudes_pg_op操作则,调用pg_op_must_wait检查该操作是否需要等待,如果需要等待,加入waiting_for_all_missing队列,如果不需要等待,调用do_pg_op处理该op。 2.检查操作的客户端是否在blacklisted中。 3.为message中的对象构建建head对象。 4.如果是顺序写,且对应对象写操作被srub阻塞了,则将op放入waiting_for_active队列中。 5.检查head对象是否missing:也就是处于缺失状态,需要恢复,若是则调用函数wait_for_unreadable_object加入相应的队列等待 6.如果是顺序写,检查是否head对象是否is_degraded_or_backfilling_object, 也就是正在recover状态,都需要调用wait_for_degraded_object加入相应的队列等待 7.检查要操作的对象是否blocked by snap;objects_blocked_on_degraded_snap队列里保存head对象,这些head对象在rollback到某一个snap对象时,snap对象处于缺失状态 ,就必须等待恢复,此时head对象不能写操作。objects_blocked_on_snap_promotion里的对象表示head对象rollback时,该对象在Cache pool层没有,需要Data pool层获取。 8.检查对象是否是顺序写入,且在objects_blocked_on_snap_promotion队列中,则调用wait_for_blocked_object函数做相应的处理。 9.为messege构建snapdir(hobject_t)对象,判断是否可读,如果不可读则调用wait_for_unreadable_object处理。 10.判断snapdir是否是degraded 对象,若是,则调用 wait_for_degraded_object()函数处理。 11.检查snapdir对象, 如果是写操作,就-EINVAL。只有读操作,才访问snapdir对象 12.构建对象oid,真正操作的对象,该对象时snapshot或者head。 ObjectContext: 13.检查该对象的IO是否在obc(ObjectContext)是被阻塞了。 14.调用函数find_object_context 获取该oid的ObjectContext实例。然后在根据不同的调用结果做检查。!!! 15.如果hit_set不为空,就需要设置hit_set. hitset, hit_set 和 agent_state都是Cache tier的机制。!!! 16.如果设置了agent_state, 就处理Cache 相关的信息。 17.获取object_locator。 18.检查IO 是否在在obc被阻塞。如果是调动wait_for_blocked_object(...)处理。 19.检查该对象的IO是否被其他对象阻塞。如果是则调用wait_for_degraded_object()处理。 20.获取src_obc对象,一个op带有多个osdop操作。!!! 21.如果是snapdir,需要所有的clone的objectContext。 如果是snapdir操作,就需要所有的clone对象,就构建所有的clone对象的objectContext,并把它加入的src_obs中 OpContext 22.构建OpContext对象。!!!OpContext保存对象在read/write过程中的所有状态。ObjectContext标记对象的变化过程。 23.execute_ctx(OpContext *ctx) 主要是把相关的操作封装成事务。 ReplicatedPG::execute_ctx(OpContext *ctx) 1.ctx->op_t = pgbackend->get_transaction(); 创建一个新的事务 ctx->op_t = pgbackend->get_transaction() 2.如果是写操作, 更新ctx->snapc 3.如果是读操作,就调用ObjectContext::ondisk_read_lock(); 为src_obc也调用ondisk_read_lock()。 4.int result = prepare_transaction(ctx)调用prepare_transaction。 5.如果是读操作,对应step 3,释放lock。 6.calc_trim_to();计算需要清除的pg log。!!! 7.检查op是否按照顺序处理的。 8.构建RepGather对象,调用issue_repop()发起replicat op操作请求。 9.eval_repop(repop);调用函数eval_repop,检查各个副本已经reply,做相应的操作 主要是想ReplicateBackend层提交事务和设置一些hook。 ReplicatedPG::issue_repop(RepGather *repop) 1.更新主pg的peer_info信息。 2.为obc/clone_obc/snapset_obc调用ondisk_write_lock() 3.调用 repop->ctx->apply_pending_attrs() 4.检查如果是 EC,是否可以rollback 5.创建回调对象(C_OSD_RepopCommit,C_OSD_RepopApplied,C_OSD_OndiskWriteUnlock),在op处理到不同阶段调用响应的hook。 6.pgbackend->submit_transaction(...) (进入PGBackend) 把打包好的事务发送给相关的其他osd,和自身处理该事务。 ReplicatedBackend::submit_transaction(...) 1.调用issue_op,向pg的其他副osd发送op请求。 2.调用parent->queue_transactions,修改自己,也就是主osd,也为该事务的处理通过 以上都在为一个op的最终处理做服务检查,osdmap检查、pg检查、object检查、封装成transaction而工作。 PGBackend::Listener(为回调对象的设置提供了接口) 的子类ReplicatedPG. 将事务提交各ObjectStore处理。 ReplicatedPG::queue_transactions(list<ObjectStore::Transaction*>& tls, OpRequestRef op = OpRequestRef()) ObjectStore::queue_transactions(...) FileStore::queue_transactions(...) 该过程中,共提供了3中同的选择,有日志的和没日志的,有日志的分日志和数据一起写和先写日志再写数据。依底层xfs为例,选择最后一种。 FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls, TrackedOpRef osd_op, ThreadPool::TPHandle *handle) 1.FileStore::build_op(...) 创建Op。 2.FileStore::_op_journal_transactions_prepare() 3.写日志FileStore::_op_journal_transactions(tbl, data_align, o->op, ondisk, osd_op);该函数中会调用FileJournal::submit_entry()该函数中会把请求放入到writeq和completions队列中。 FileJournal中的write_thread线程会处理writeq队列,把日志写入到日志盘中;do_aio_write(bl);//把取出的write_items写入到journal文件中。FileJournal中的write_finish_thread主要处理completions队列 判断aio请求是否完成,完成就他它从队列中移出。FileJournal::queue_completions_thru() -->finisher->queue(next.finish);将之前设置的回调对象放入finisher_queue;写返回。 4.写数据FileStore::queue_op(OpSequencer *osr, Op *o) -->osr->queue(o);op_wq.queue(osr); 把请求放入到FileStore的op_wq中。该队列会被threadpool处理。FileStore::OpWQ::_process(...) 5.FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) 6. FileStore::_do_transactions(...)实现真实数据写入对象对应的文件中(_do_transaction(**p, op_seq, trans_num, handle);