java – Spring批处理从多个来源读取作业

前端之家收集整理的这篇文章主要介绍了java – Spring批处理从多个来源读取作业前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
如何从多个数据库中读取项目?我已经知道这是可能的文件.
以下示例适用于从多个文件读取
...
<job id="readMultiFileJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1">
    <tasklet>
        <chunk reader="multiResourceReader" writer="flatFileItemWriter"
            commit-interval="1" />
    </tasklet>
    </step>
</job>
...
<bean id="multiResourceReader"
    class=" org.springframework.batch.item.file.MultiResourceItemReader">
    <property name="resources" value="file:csv/inputs/domain-*.csv" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>
...

这样的三个豆子.

<bean id="database2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
    <property name="name" value="database2Reader" />
    <property name="dataSource" ref="dataSource2" />
    <property name="sql" value="select image from object where image like '%/images/%'" />
    <property name="rowMapper">
        <bean class="sym.batch.ImagesRowMapper2" />
    </property>
</bean>

解决方法

没有一个即用的组件来执行你所要求的;唯一的解决方案是编写一个自定义的ItemReader&代表JdbcCursorItemReader(或HibernateCursorItemReader或任何通用的ItemReader实现).
您需要准备所有必需的东西(数据源,会话,真实数据库读取器),并将所有委派读者绑定到您的定制阅读器.

编辑:
您需要使用ItemReader.read()和mantain reader的注释来模拟循环,并在作业重新启动时委派状态.

class MyItemReader<T> implements ItemReader<T>,ItemStream {
  private ItemReader[] delegates;
  private int delegateIndex;
  private ItemReader<T> currentDelegate;
  private ExecutionContext stepExecutionContext;

  public void setDelegates(ItemReader[] delegates) {
    this.delegates = delegates;
  }

  @BeforeStep
  private void beforeStep(StepExecution stepExecution) {
    this.stepExecutionContext = stepExecution.getExecutionContext();
  }

  public T read() {
    T item = null;
    if(null != currentDelegate) {
      item = currentDelegate.read();
      if(null == item) {
        ((ItemStream)this.currentDelegate).close();
        this.currentDelegate = null;
      }
    }
    // Move to next delegate if prevIoUs was exhausted!
    if(null == item && this.delegateIndex< this.delegates.length) {
      this.currentDelegate = this.delegates[this.currentIndex++];
      ((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
      update(this.stepExecutionContext);
      // Recurse to read() to simulate loop through delegates
      item = read();
    }
    return item;
  }

  public void open(ExecutionContext ctx) {
    // During open restore last active reader and restore its state
    if(ctx.containsKey("index")) {
      this.delegateIndex = ctx.getInt("index");
      this.currentDelegate = this.delegates[this.delegateIndex];
      ((ItemStream)this.currentDelegate ).open(ctx);
    }
  }

  public void update(ExecutionContext ctx) {
    // Update current delegate index and state
    ctx.putInt("index",this.delegateIndex);
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).update(ctx);
    }
  }

  public void close(ExecutionContext ctx) {
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).close();
  }
}
<bean id="myItemReader" class=path.to.MyItemReader>
  <property name="delegates">
    <array>
      <ref bean="itemReader1"/>
      <ref bean="itemReader2"/>
      <ref bean="itemReader3"/>
    </array>
  </property>
</bean>

编辑2:记住设置属性名称;这是必需的,让MyItemReader.read()正常工作

<bean id="itemReader1" class="JdbcCursorItemReader">
  <property name="name" value="itemReader1" />
  <!-- Set other properties -->
</bean>

猜你在找的Java相关文章