Java自定义线程池和线程总数控制

    xiaoxiao2021-03-25  99

    1 概述

    池化是常见的思想,线程池是非常典型的池化的实现,《Java并发编程实战》也大篇幅去讲解了Java中的线程池。本文实现一个简单的线程池。

    2 核心类

    【1】接口定义

    [java]  view plain  copy  print ? public interface IThreadPool<Job extends Runnable> {       /**       * 关闭线程池       */       public void shutAlldown();          /**       * 执行任务       *        * @param job 任务       */       public void execute(Job job);          /**       * 添加工作者       *        * @param addNum 添加数       */       public void addWorkers(int addNum);          /**       * 减少工作者       *        * @param reduceNum 减少数目       */       public void reduceWorkers(int reduceNum);   }   【2】实现类

    线程池的核心是维护了1个任务列表和1个工作者列表

    [java]  view plain  copy  print ? import java.util.ArrayList;   import java.util.Collections;   import java.util.LinkedList;   import java.util.List;      public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> {          // 默认线程数       private static int DEAFAULT_SIZE = 5;       // 最大线程数       private static int MAX_SIZE = 10;          // 任务列表       private LinkedList<Job> tasks = new LinkedList<Job>();       // 工作线程列表       private List<Worker> workers = Collections               .synchronizedList(new ArrayList<Worker>());          /**       * 默认构造函数       */       public XYThreadPool() {           initWokers(DEAFAULT_SIZE);       }          /**       * 执行线程数       *        * @param threadNums 线程数       */       public XYThreadPool(int workerNum) {           workerNum = workerNum <= 0 ? DEAFAULT_SIZE                   : workerNum > MAX_SIZE ? MAX_SIZE : workerNum;           initWokers(workerNum);       }          /**       * 初始化线程池       *        * @param threadNums 线程数       */       public void initWokers(int threadNums) {           for (int i = 0; i < threadNums; i++) {               Worker worker = new Worker();               worker.start();               workers.add(worker);           }           // 添加关闭钩子           Runtime.getRuntime().addShutdownHook(new Thread() {               public void run() {                   shutAlldown();               }           });       }          @Override       public void shutAlldown() {           for (Worker worker : workers) {               worker.shutdown();           }       }          @Override       public void execute(Job job) {           synchronized (tasks) {               // 提交任务就是将任务对象加入任务队列,等待工作线程去处理               tasks.addLast(job);               tasks.notifyAll();           }       }          @Override       public void addWorkers(int addNum) {           // 新线程数必须大于零,并且线程总数不能大于最大线程数           if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {               initWokers(addNum);           } else {               System.out.println("addNum too large");           }       }          @Override       public void reduceWorkers(int reduceNum) {           if ((workers.size() - reduceNum <= 0))               System.out.println("thread num too small");           else {               // 暂停指定数量的工作者               int count = 0;               while (count != reduceNum) {                   for (Worker w : workers) {                       w.shutdown();                       count++;                   }               }           }       }          /**       * 工作线程       */       class Worker extends Thread {              private volatile boolean flag = true;              @Override           public void run() {               while (flag) {                   Job job = null;                   // 加锁(若只有一个woker可不必加锁,那就是所谓的单线程的线程池,线程安全)                   synchronized (tasks) {                       // 任务队列为空                       while (tasks.isEmpty()) {                           try {                               // 阻塞,放弃对象锁,等待被notify唤醒                               tasks.wait();                               System.out.println("block when tasks is empty");                           } catch (InterruptedException e) {                               e.printStackTrace();                           }                       }                       // 不为空取出任务                       job = tasks.removeFirst();                       System.out.println("get job:" + job + ",do biz");                       job.run();                   }               }           }              public void shutdown() {               flag = false;           }       }   }  

    1 当调用wait()方法时线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后本线程才进入对象锁定池准备

    2 Object的方法:void notify(): 唤醒一个正在等待该对象的线程。void notifyAll(): 唤醒所有正在等待该对象的线程。notifyAll使所有原来在该对象上等待被notify的线程统统退出wait状态,变成等待该对象上的锁,一旦该对象被解锁,它们会去竞争。notify只是选择一个wait状态线程进行通知,并使它获得该对象上的锁,但不惊动其它同样在等待被该对象notify的线程们,当第一个线程运行完毕以后释放对象上的锁,此时如果该对象没有再次使用notify语句,即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,继续处在wait状态,直到这个对象发出一个notify或notifyAll,它们等待的是被notify或notifyAll,而不是锁

    3 无需控制线程总数 每调用一次就会创建一个拥有10个线程工作者的线程池。 [java]  view plain  copy  print ? public class TestService1 {       public static void main(String[] args) {           // 启动10个线程           XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);           pool.execute(new Runnable() {               @Override               public void run() {                   System.out.println("====1 test====");               }           });        }   }      public class TestService2 {       public static void main(String[] args) {           // 启动10个线程           XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);           pool.execute(new Runnable() {               @Override               public void run() {                   System.out.println("====2 test====");               }           });       }   }  

    4 控制线程总数 希望在项目中所有的线程调用,都共用1个固定工作者数大小的线程池

    [java]  view plain  copy  print ? import javax.annotation.PostConstruct;   import org.springframework.stereotype.Component;   import com.xy.pool.XYThreadPool;      /**   * 统一线程池管理类    */   @Component   public class XYThreadManager {          private XYThreadPool<Runnable> executorPool;          @PostConstruct       public void init() {           executorPool = new XYThreadPool<Runnable>(10);       }          public XYThreadPool<Runnable> getExecutorPool() {           return executorPool;       }   }      import org.springframework.beans.factory.annotation.Autowired;   import org.springframework.stereotype.Service;      @Service("testService3")   public class TestService3 {              @Autowired       private XYThreadManager threadManager;              public void test() {           threadManager.getExecutorPool().execute(new Runnable() {               @Override               public void run() {                   System.out.println("====3 test====");               }           });       }   }      import org.springframework.beans.factory.annotation.Autowired;   import org.springframework.stereotype.Service;      @Service("testService4")   public class TestService4 {              @Autowired       private XYThreadManager threadManager;              public void test() {           threadManager.getExecutorPool().execute(new Runnable() {               @Override               public void run() {                   System.out.println("====4 test====");               }           });       }   }      import org.springframework.context.ApplicationContext;   import org.springframework.context.support.ClassPathXmlApplicationContext;      public class TestMain {          @SuppressWarnings("resource")       public static void main(String[] args) {           ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml");              TestService3 t3 = (TestService3) atc.getBean("testService3");           t3.test();              TestService4 t4 = (TestService4) atc.getBean("testService4");           t4.test();       }      }  
    转载请注明原文地址: https://ju.6miu.com/read-21096.html

    最新回复(0)