初识Spring Batch

    xiaoxiao2021-12-14  19

    概念

    Spring Batch是一个优秀的离线批处理框架。可以批量处理的数据格式可以是文本文件、xml文件以及持久化的数据。当然Spring Batch还提供了丰富的接口可以扩展,处理不同格式的数据。处理规模为中小型(对于海量的数据可以考虑Hadoop)。使用场景可以考虑账单对账,数据迁移,定时批量更新数据等等。

    层次架构如上图。分三层:应用层,核心层,基础设施层。应用层包括所有的batch作业和用户开发的代码。核心层包括在运行期运行一个作业所需要的类,例如:JobLauncher,Job和Step的实现。应用和核心层都在基础设施层之上,基础设施层包括通用的读写器(readers and writers)以及如RetryTemplate等服务。

    作业配置

    JobRepository:保存作业配置的仓库,可以是保存在内存或者持久化在数据库中。JobLauncher:作业执行启动类Job:作业的配置。领域模型核心类。JobInstance:作业运行的实例JobParameter:作业的参数JobExecution:作业执行的实例。JobInstance可以有多次执行,每次执行JobExecution都不同。step:作业步骤配置chunk: 数据处理块配置

    数据处理

    ItemReader:读数据ItemProcessor:对读出的数据进行处理ItemWriter:写数据

    配置实例

    下面演示从customer表批量导入到customer2表,customer表结构如下

    CREATE TABLE `customer` ( `id` int(11) NOT NULL, `name` varchar(45) NOT NULL, `age` int(11) NOT NULL, `address` varchar(45) NOT NULL DEFAULT '', `code` varchar(45) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;

    配置jobRepository

    作业元数据保存在mysql的时候配置

    <!-- connect to database --> <bean id="jobMetaDataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/spring_batch_job"/> <property name="username" value="root"/> <property name="password" value="123456"/> </bean> <!-- 首次创建job-meta的时候使用 --> <!-- Create job-meta tables automatically --> <jdbc:initialize-database data-source="jobMetaDataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql"/> <jdbc:script location="org/springframework/batch/core/schema-mysql.sql"/> </jdbc:initialize-database> <bean id="myDataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/example"/> <property name="username" value="root"/> <property name="password" value="123456"/> <property name="maxActive" value="100"/> <property name="maxWait" value="1000"/> <property name="defaultAutoCommit" value="true"/> <property name="validationQuery" value="SELECT 1"/> <property name="initialSize" value="30"/> <property name="minIdle" value="30"/> <property name="testWhileIdle" value="true"/> <!--<property name="testOnBorrow" value="true"/>--> <!--<property name="testOnReturn" value="true"/>--> <!--<property name="removeAbandoned" value="true"/>--> <!--<property name="removeAbandonedTimeout" value="1800"/>--> <property name="timeBetweenEvictionRunsMillis" value="7200000"/> <property name="connectionInitSqls" value="set names utf8mb4"/> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="jobMetaDataSource"/> </bean> <!-- stored job-metadata in database --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="jobMetaDataSource"/> <property name="transactionManager" ref="transactionManager"/> <property name="databaseType" value="mysql"/> </bean>

    也可以将作业元数据保存到内存中

    <!-- stored job-metadata in memory --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean>

    配置Job

    <batch:job id="exportCustomerJob"> <batch:step id="onStep"> <batch:tasklet> <batch:chunk reader="reader" writer="writer" commit-interval="5"/> </batch:tasklet> </batch:step> </batch:job> <bean id="reader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="myDataSource"/> <property name="sql" value="select * from customer"/> <property name="rowMapper"> <bean class="com.ydoing.spring.batch.CustomerRowMapper"/> </property> </bean> <bean id="writer" class="org.springframework.batch.item.database.JdbcBatchItemWriter"> <property name="dataSource" ref="myDataSource"/> <property name="sql"> <value> <![CDATA[ replace into customer2(id, age, name, address, code) values (:id, :age, :name, :address, :code) ]]> </value> </property> <!-- It will take care matching between object property and sql name parameter --> <property name="itemSqlParameterSourceProvider"> <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/> </property> </bean>

    RowMapper

    CustomerRowMapper类源码

    public class CustomerRowMapper implements RowMapper<Customer> { @Override public Customer mapRow(ResultSet rs, int rowNum) throws SQLException { Customer customer = new Customer(); customer.setId(rs.getLong(Customer.ID)); customer.setAge(rs.getInt(Customer.AGE)); customer.setName(rs.getString(Customer.NAME)); customer.setAddress(Customer.ADDRESS); customer.setCode(rs.getString(Customer.CODE)); return customer; } }

    启动运行

    public class App { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("exportCustomerJob"); try { JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Exit Status : " + execution.getStatus()); } catch (Exception e) { e.printStackTrace(); } System.out.println("Done"); }
    转载请注明原文地址: https://ju.6miu.com/read-962878.html

    最新回复(0)