개발/Spring Batch

Spring Batch 파티셔닝(partitioning) 예제(Database range query)

신매력 2016. 5. 25. 11:54

Batch job을 구성할 때 보통 reader - writer 구조로 작성한다.

reader에서 페이징으로 row들을 읽어서, 

writer에서 commit interval 만큼 단건을 write 하는 방식이다.


나는 reader에서 range로 읽어서 range로 한방에 write 하려는 목적을 가지고 찾아본 결과

파티셔닝이 적합해보여서 조사해보았다.


파티셔닝은 여러 일을 동시에 수행시킬 때 사용하기 유용하다.



1. 파티셔닝 (Partitioning)의 개념



파티셔닝이란, 병렬로 chunk 단위(각 slave들)을 동시에 수행하는 것이다.

* chunk : reader, processor, writer를 수행하는 단위 (tasklet으로 통째로 구현하는 것도 가능) 


Master가 Partitioner로 각 slave들이 수행할 값들로 쪼개놓고,

Slave들이 멀티쓰레드로 동시에 돌아가게 하는 것이다.


master도 한 스텝이고, slave도 별도의 스텝이다.


master 위 아래에 있는 각 step은 생략 및 추가 가능하다.



내가 보여줄 예제는 아래와 같은 구조가 될 것이다.

(Master step 전 후 step은 생략)

2. Job.xml 코드 예제


<batch:job id="partitioningTestJob">

<batch:step id="master">

<batch:partition step="slave" partitioner="partitioner">

<batch:handler grid-size="3" task-executor="taskExecutor" />

</batch:partition>

</batch:step>

</batch:job>

<bean id="partitioner" class="com.batch.ColumnRangePartitioner" scope="step">

<property name="min" value="#{jobParameters['min']}" />

<property name="max" value="#{jobParameters['max']}" />

</bean>

<step id="slave" xmlns="http://www.springframework.org/schema/batch">

<tasklet>

<chunk reader="partitioningTestItemReader" writer="partitioningTestItemWriter" commit-interval="1" />

</tasklet>

</step>


<!-- 각 slave들에서 수행할 reader -->

<bean id="partitioningTestItemReader" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">

<property name="dataSource" ref="commonDataSource"/>

<property name="queryProvider">

<bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">

<property name="dataSource" ref="commonDataSource"/>

<property name="selectClause" value="SELECT id, MAX(id) AS max, MIN(id) AS min"/>

<property name="fromClause" value="FROM table"/>

<property name="whereClause" value="WHERE id &gt;= :minValue AND id &lt;= :maxValue />

</bean>

</property>

<property name="parameterValues">

<map>

            <entry key="minValue" value="#{stepExecutionContext['minValue']}"/>

   <entry key="maxValue" value="#{stepExecutionContext['maxValue']}" />

</map>

</property>

<property name="pageSize" value="#{jobParameters['pageSize']}" />

<property name="rowMapper">

<bean class="org.springframework.jdbc.core.ColumnMapRowMapper"/>

</property>

</bean>


<!-- 각 slave들에서 수행할 writer -->

<bean id="partitioningTestItemWriter" 

class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope="step">

    <property name="dataSource" ref="commonDataSource" />

      <property name="sql">

        <value>

            <![CDATA[

        UPDATE table

SET closed = 1

WHERE id between :min AND :max 

            ]]>

        </value>

       </property>

</bean>


grid-size는 Slave의 개수가 된다. 

파티셔너의 grid size는 필수 파라미터이다.


partitioner bean에 min, max 값을 외부에서 넣도록 해놓았는데

데이터의 총 Range를 의미한다.

나의 경우, min은 1, max는 60이 될 것이다. (위 그림 참조)


* 참고 : pageSize값을 외부에서 받게 했는데, 1보다 큰 수로 넣어야한다. 

1을 넣으면, 각 slave에서 무한루프를 돌게된다. 

reader의 결과 값은 MIN, MAX를 구하고 있기때문에 항상 1 row가 결과로 나오게 되고,

pageSize가 1이면 다음페이지가 있는지 확인하기 위해 무한루프 도는 것.


3. Partitioner 코드 예제


public class ColumnRangePartitioner implements Partitioner {

private Integer min;

private Integer max;

// TODO : min, max의 setter, getter

@Override

public Map<String, ExecutionContext> partition(int gridSize) {

int targetSize = (max - min) / gridSize + 1;

Map<String, ExecutionContext> result = new HashMap<>();

int number = 0;

int start = min;

int end = start + targetSize - 1;

 

while (start <= max) {

if (end >= max) {

end = max;

}

ExecutionContext value = new ExecutionContext();

value.putInt("minValue", start);

value.putInt("maxValue", end);

result.put("partition" + number, value);

start += targetSize;

end += targetSize;

number++;

}

return result;

}

}


외부에서 받은 min, max 파라미터의 값을 참조해서 

이 파티셔너에서 slave에 넘어갈 값을 정의해주는 부분이다.


result는 아래와 같이 된다.


{partition0={minValue=1, maxValue=20}, 

partition1={minValue=21, maxValue=40},

partition2={minValue=41, maxValue=60}} 


참고 :

- partition의 이름은 아무 이름으로 해도 상관 없으며 중복만 안되면 된다.

- 파티션 된 개수가 gridSize의 개수보다 넘치거나 모자라도 상관은 없다.

- minValue, maxValue의 범위의 개수가 모자라거나 넘쳐도 상관은 없다.





아래 사이트를 참고했습니다 

http://www.egovframe.go.kr/wiki/doku.php?id=egovframework:rte2:brte:batch_core:parallel_process