-
Stateful Stream ProcessingFlink/Concepts 2024. 4. 21. 13:58
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/
Flink 1.17 document 기반으로 정리했습니다.
What is State
- dataflow에서 많은 operator들은 단순히 한 번에 하나의 개별 이벤트를 보지만 몇몇 operatoion들은 여러개의 이벤트를 기억 할 때가 있음 (ex. window) ⇒ 이러한 operations들을 stateful하다고 한다
- statefule operations들의 종류
- 애플리케이션이 특정한 이벤트 패턴을 찾을 때, state는 지금까지의 event를 sequence하게 저장
- minute/hour/day 단위 이벤트를 aggregate할 때, state는 pending된 aggregate를 가짐
- 스트림을 통해 기계 학습 모델을 훈련할 때, state는 현재 모델의 파라미터를 보유
- 과거 데이터 관리가 필요할 때, state는 과거에 일어난 event에 대해서 효율적으로 접근 가능하게 함
- checkpoint와 savepoint를 사용하여 fault tolerant하게 동작하도록 하기 위해 flink는 state를 인지할 수 있어야 함
- Queryable State사용시 flink runtime에서 외부에서 state 접근 가능
Keyed State
- key / value store
- stateful operator로 읽어질 때 파티셔닝 및 분배가 일어남
- keyed streams에서만, 키로 파티셔닝 된 데이터 교환 후에 저장이 가능하고 현재의 이벤트 키값을 기준으로 state 저장
- state 업데이트가 로컬에서만 일어나기(트랜젝션이 없음) 때문에 overhead가 많이 줄음
- Key Group 이라는 것도 존재
State Persistence
- fault tolerance는 stream reply와 checkpointing을 사용하여 구현됨
- 체크포인트는 각 operator의 상태와 입력 스트림의 point들을 저장
- interval 수치는 recovery time과 trade off 관계 (interval이 클수록 recovery 해야 하는 state가 많아짐)
- flink가 재시작 되면 가장 최근 checkpoint를 가져와 state를 복구
⇒ 문서에서는 checkpoint와 savepoint라는 의미를 같은 의미로 사용할 때가 있음
Checkpointing
- fault tolerance 매커니즘의 핵심은 분산 데이터 스트림과 operator state의 일관된 스냅샷을 그리는 것
- snapshot을 그리는 원리는 다음 논문을 참고
Barriers
- flink 분산 snapshot의 핵심 요소는 steam barrier이다
- steam barrier는 데이터 스트림에 포함되어서 data stream일부로 처리가 됨
- barrier는 데이터 스트림 레코드를 현재 스냅샷과 다음 스냅샷에 들어가는 레코드들을 분리
- barrier는 동시간 대의 stream의 존재할 수 있어서 concurrently하게 스냅샷이 이뤄질 수 있음
- source
- barrier가 주입됨
- kafka 같은 경우 파티션의 last offset에 대한 정보를 저장함 ⇒ Flink’s JobManager의 checkpoint coordinator에 reporting됨
- flow downstream
- 하나의 sink operator가 barrier n을 input stream에서 받았으면 checkpoint coordinator에게 알려주고 모든 sink가 snapshot을 알게 됐다면 작업이 완료 되었다고 판단
- 스냅샷 n이 완료되면 작업은 더이상 source에 Sn 이전의 레코드를 요청하지 않음
- 둘 이상의 입력 스트림에서 입력을 받는 경우 snapshot barriers안에서 정렬 되어야 함
- 하나의 스트림에서 n을 처리했는데 다른 입력에서 온 n을 수신할 때 까지 해당 스트림의 추가 레코드를 처리할 수 없음, 그렇지 않으면 스냅샷 n에 속하는 레코드와 스냅샷 n+1에 속하는 레코드가 혼합됨
- 마지막 스트림이 barrier n을 수신하면 보류중인 snapshot n을 전부 보냄
- snapshot을 찍고 스트림 처리 재개
- state backend에 비동기적으로 상태를 씀
snapshotting operator state
- operator의 state가 들어있다면 state 저장이 되어야 함
- input stream에서 barrier가 모두 들어오면 state를 snapshot함 (output stream으로 보내기 전)
- production에서는 state backend 저장소를 HDFS같은 대용량으로 저장할 수 있는 저장소로 사용 (defualt는 job manager 메모리)
(statrbackend 저장소와 snapshot 저장소는 다른 개념인지 같은 개념인지 궁금)
Recovery
- 가장 최근 snapshot을 불러와서 처리 재개
Unaligned Checkpointing
- 아이디어는 체크포인트가 state가 되기 전 까지의 전송중인 데이터보다 우선순위가 높다는 것을 기반으로 구현
- operator는 입력 버퍼에 저장된 첫 번째 barrier에 반응
- barrier를 출력 버퍼의 끝에 추가하여 즉시 다운스트림 연산자로 전달
- operator는 모든 추월된 레코드를 비동기식으로 저장하도록 표시하고 자체 상태의 스냅샷을 생성
- I/O때문에 성능이 안좋을 수 있음
(snapshot을 찍는다는 것은 전송중인 데이터 모두를 저장한다는 소리인지 궁금)
- recovery는 aligned된 체크포인트와 동일한 방법으로 복구
State Backends
(checkpoint 저장도 해당되는지 궁금)
- heap 메모리를 사용 할 수도 있고 RocksDB 사용을 할수도 있음 (key/value store로)
- key/value state의 snapshot을 저장하는데도 사용되고 checkpoint의 일부인 snapshot을 저장하는데도 사용됨
Savepoint
- 모든 프로그램은 savepoint에서 실행 재개 가능
- savepoint는 수동적으로 트리거되어서 생성되는 checkpoint