Linux 工作队列之工作者线程创建

    xiaoxiao2021-03-25  116

    前面介绍了工作队列相关数据结构定义,那么对于工作队列上的任务什么时候执行呢?以及相关 数据结构是什么呢? 工作队列中,执行任务的叫worker,这些worker构成worker_pool。所以有下面定义:

    /*  * The poor guys doing the actual heavy lifting.  All on-duty workers are  * either serving the manager role, on idle list or on busy hash.  For  * details on the locking annotation (L, I, X...), refer to workqueue.c.  *  * Only to be used in workqueue and async.  */ struct worker {  /* on idle list while idle, on busy hash table while busy */  union {   struct list_head entry; /* L: while idle */   struct hlist_node hentry; /* L: while busy */  };

     struct work_struct *current_work; /* L: work being processed */  work_func_t  current_func; /* L: current_work's fn */  struct pool_workqueue *current_pwq; /* L: current_work's pwq */  bool   desc_valid; /* ->desc is valid */  struct list_head scheduled; /* L: scheduled works */

     /* 64 bytes boundary on 64bit, 32 on 32bit */

     struct task_struct *task;  /* I: worker task */  struct worker_pool *pool;  /* I: the associated pool */       /* L: for rescuers */  struct list_head node;  /* A: anchored at pool->workers */       /* A: runs through worker->node */

     unsigned long  last_active; /* L: last active timestamp */  unsigned int  flags;  /* X: flags */  int   id;  /* I: worker id */

     /*   * Opaque string set with work_set_desc().  Printed out with task   * dump for debugging - WARN, BUG, panic or sysrq.   */  char   desc[WORKER_DESC_LEN];

     /* used only by rescuers to point to the target workqueue */  struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */ };

    /*  * Structure fields follow one of the following exclusion rules.  *  * I: Modifiable by initialization/destruction paths and read-only for  *    everyone else.  *  * P: Preemption protected.  Disabling preemption is enough and should  *    only be modified and accessed from the local cpu.  *  * L: pool->lock protected.  Access with pool->lock held.  *  * X: During normal operation, modification requires pool->lock and should  *    be done only from local cpu.  Either disabling preemption on local  *    cpu or grabbing pool->lock is enough for read access.  If  *    POOL_DISASSOCIATED is set, it's identical to L.  *  * A: pool->attach_mutex protected.  *  * PL: wq_pool_mutex protected.  *  * PR: wq_pool_mutex protected for writes.  Sched-RCU protected for reads.  *  * PW: wq_pool_mutex and wq->mutex protected for writes.  Either for reads.  *  * PWR: wq_pool_mutex and wq->mutex protected for writes.  Either or  *      sched-RCU for reads.  *  * WQ: wq->mutex protected.  *  * WR: wq->mutex protected for writes.  Sched-RCU protected for reads.  *  * MD: wq_mayday_lock protected.  */

    /* struct worker is defined in workqueue_internal.h */

    struct worker_pool {  spinlock_t  lock;  /* the pool lock */  int   cpu;  /* I: the associated cpu */  int   node;  /* I: the associated node ID */  int   id;  /* I: pool ID */  unsigned int  flags;  /* X: flags */

     unsigned long  watchdog_ts; /* L: watchdog timestamp */

     struct list_head worklist; /* L: list of pending works */  int   nr_workers; /* L: total number of workers */

     /* nr_idle includes the ones off idle_list for rebinding */  int   nr_idle; /* L: currently idle ones */

     struct list_head idle_list; /* X: list of idle workers */  struct timer_list idle_timer; /* L: worker idle timeout */  struct timer_list mayday_timer; /* L: SOS timer for workers */

     /* a workers is either on busy_hash or idle_list, or the manager */  DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);       /* L: hash of busy workers */

     /* see manage_workers() for details on the two manager mutexes */  struct mutex  manager_arb; /* manager arbitration */  struct worker  *manager; /* L: purely informational */  struct mutex  attach_mutex; /* attach/detach exclusion */  struct list_head workers; /* A: attached workers */  struct completion *detach_completion; /* all workers detached */

     struct ida  worker_ida; /* worker IDs for task name */

     struct workqueue_attrs *attrs;  /* I: worker attributes */  struct hlist_node hash_node; /* PL: unbound_pool_hash node */  int   refcnt;  /* PL: refcnt for unbound pools */

     /*   * The current concurrency level.  As it's likely to be accessed   * from other CPUs during try_to_wake_up(), put it in a separate   * cacheline.   */  atomic_t  nr_running ____cacheline_aligned_in_smp;

     /*   * Destruction of pool is sched-RCU protected to allow dereferences   * from get_work_pool().   */  struct rcu_head  rcu; } ____cacheline_aligned_in_smp;

    内核对worker创建由函数 struct worker *create_worker(struct worker_pool *pool)

    处理,worker->task这个绑定了一个内核线程。

    /**  * create_worker - create a new workqueue worker  * @pool: pool the new worker will belong to  *  * Create a new worker which is bound to @pool.  The returned worker  * can be started by calling start_worker() or destroyed using  * destroy_worker().  *  * CONTEXT:  * Might sleep.  Does GFP_KERNEL allocations.  *  * RETURNS:  * Pointer to the newly created worker.  */ static struct worker *create_worker(struct worker_pool *pool) {  struct worker *worker = NULL;  int id = -1;  char id_buf[16];

     lockdep_assert_held(&pool->manager_mutex);

     /*   * ID is needed to determine kthread name.  Allocate ID first   * without installing the pointer.   */  idr_preload(GFP_KERNEL);  spin_lock_irq(&pool->lock);

     id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT);

     spin_unlock_irq(&pool->lock);  idr_preload_end();  if (id < 0)   goto fail;

     worker = alloc_worker();  if (!worker)   goto fail;

     worker->pool = pool;  worker->id = id;

     if (pool->cpu >= 0)   snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,     pool->attrs->nice < 0  ? "H" : "");  else   snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

     worker->task = kthread_create_on_node(worker_thread, worker, pool->node,            "kworker/%s", id_buf);  if (IS_ERR(worker->task))   goto fail;

     /*   * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any   * online CPUs.  It'll be re-applied when any of the CPUs come up.   */  set_user_nice(worker->task, pool->attrs->nice);  set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);

     /* prevent userland from meddling with cpumask of workqueue workers */  worker->task->flags |= PF_NO_SETAFFINITY;

     /*   * The caller is responsible for ensuring %POOL_DISASSOCIATED   * remains stable across this function.  See the comments above the   * flag definition for details.   */  if (pool->flags & POOL_DISASSOCIATED)   worker->flags |= WORKER_UNBOUND;

     /* successful, commit the pointer to idr */  spin_lock_irq(&pool->lock);  idr_replace(&pool->worker_idr, worker, worker->id);  spin_unlock_irq(&pool->lock);

     return worker;

    fail:  if (id >= 0) {   spin_lock_irq(&pool->lock);   idr_remove(&pool->worker_idr, id);   spin_unlock_irq(&pool->lock);  }  kfree(worker);  return NULL; }

    任务唤醒函数:

    static void wake_up_worker(struct worker_pool *pool) {  struct worker *worker = first_worker(pool);

     if (likely(worker))   wake_up_process(worker->task); }

     

       
    转载请注明原文地址: https://ju.6miu.com/read-6183.html

    最新回复(0)