ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Stateful Stream Processing
    Flink/Concepts 2024. 4. 21. 13:58

    https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/

     

    Stateful Stream Processing

    Stateful Stream Processing # What is State? # While many operations in a dataflow simply look at one individual event at a time (for example an event parser), some operations remember information across multiple events (for example window operators). These

    nightlies.apache.org

    Flink 1.17 document 기반으로 정리했습니다.

    What is State

    • dataflow에서 많은 operator들은 단순히 한 번에 하나의 개별 이벤트를 보지만 몇몇 operatoion들은 여러개의 이벤트를 기억 할 때가 있음 (ex. window) ⇒ 이러한 operations들을 stateful하다고 한다
    • statefule operations들의 종류
      • 애플리케이션이 특정한 이벤트 패턴을 찾을 때, state는 지금까지의 event를 sequence하게 저장
      • minute/hour/day 단위 이벤트를 aggregate할 때, state는 pending된 aggregate를 가짐
      • 스트림을 통해 기계 학습 모델을 훈련할 때, state는 현재 모델의 파라미터를 보유
      • 과거 데이터 관리가 필요할 때, state는 과거에 일어난 event에 대해서 효율적으로 접근 가능하게 함
    • checkpoint와 savepoint를 사용하여 fault tolerant하게 동작하도록 하기 위해 flink는 state를 인지할 수 있어야 함
    • Queryable State사용시 flink runtime에서 외부에서 state 접근 가능

    Keyed State

    • key / value store
    • stateful operator로 읽어질 때 파티셔닝 및 분배가 일어남
    • keyed streams에서만, 키로 파티셔닝 된 데이터 교환 후에 저장이 가능하고 현재의 이벤트 키값을 기준으로 state 저장
    • state 업데이트가 로컬에서만 일어나기(트랜젝션이 없음) 때문에 overhead가 많이 줄음
    • Key Group 이라는 것도 존재

    State Persistence

    • fault tolerance는 stream reply와 checkpointing을 사용하여 구현됨
    • 체크포인트는 각 operator의 상태와 입력 스트림의 point들을 저장
    • interval 수치는 recovery time과 trade off 관계 (interval이 클수록 recovery 해야 하는 state가 많아짐)
    • flink가 재시작 되면 가장 최근 checkpoint를 가져와 state를 복구

    ⇒ 문서에서는 checkpoint와 savepoint라는 의미를 같은 의미로 사용할 때가 있음

    Checkpointing

    Barriers

    • flink 분산 snapshot의 핵심 요소는 steam barrier이다
    • steam barrier는 데이터 스트림에 포함되어서 data stream일부로 처리가 됨
    • barrier는 데이터 스트림 레코드를 현재 스냅샷과 다음 스냅샷에 들어가는 레코드들을 분리
    • barrier는 동시간 대의 stream의 존재할 수 있어서 concurrently하게 스냅샷이 이뤄질 수 있음

    • source
      • barrier가 주입됨
      • kafka 같은 경우 파티션의 last offset에 대한 정보를 저장함 ⇒ Flink’s JobManager의 checkpoint coordinator에 reporting됨
    • flow downstream
      • 하나의 sink operator가 barrier n을 input stream에서 받았으면 checkpoint coordinator에게 알려주고 모든 sink가 snapshot을 알게 됐다면 작업이 완료 되었다고 판단
    • 스냅샷 n이 완료되면 작업은 더이상 source에 Sn 이전의 레코드를 요청하지 않음
    • 둘 이상의 입력 스트림에서 입력을 받는 경우 snapshot barriers안에서 정렬 되어야 함
      • 하나의 스트림에서 n을 처리했는데 다른 입력에서 온 n을 수신할 때 까지 해당 스트림의 추가 레코드를 처리할 수 없음, 그렇지 않으면 스냅샷 n에 속하는 레코드와 스냅샷 n+1에 속하는 레코드가 혼합됨
      • 마지막 스트림이 barrier n을 수신하면 보류중인 snapshot n을 전부 보냄
      • snapshot을 찍고 스트림 처리 재개
      • state backend에 비동기적으로 상태를 씀

    snapshotting operator state

    • operator의 state가 들어있다면 state 저장이 되어야 함
    • input stream에서 barrier가 모두 들어오면 state를 snapshot함 (output stream으로 보내기 전)
    • production에서는 state backend 저장소를 HDFS같은 대용량으로 저장할 수 있는 저장소로 사용 (defualt는 job manager 메모리)

    (statrbackend 저장소와 snapshot 저장소는 다른 개념인지 같은 개념인지 궁금)

    Recovery

    • 가장 최근 snapshot을 불러와서 처리 재개

    Unaligned Checkpointing

    • 아이디어는 체크포인트가 state가 되기 전 까지의 전송중인 데이터보다 우선순위가 높다는 것을 기반으로 구현

    • operator는 입력 버퍼에 저장된 첫 번째 barrier에 반응
    • barrier를 출력 버퍼의 끝에 추가하여 즉시 다운스트림 연산자로 전달
    • operator는 모든 추월된 레코드를 비동기식으로 저장하도록 표시하고 자체 상태의 스냅샷을 생성
    • I/O때문에 성능이 안좋을 수 있음

    (snapshot을 찍는다는 것은 전송중인 데이터 모두를 저장한다는 소리인지 궁금)

    • recovery는 aligned된 체크포인트와 동일한 방법으로 복구

    State Backends

    (checkpoint 저장도 해당되는지 궁금)

    • heap 메모리를 사용 할 수도 있고 RocksDB 사용을 할수도 있음 (key/value store로)
    • key/value state의 snapshot을 저장하는데도 사용되고 checkpoint의 일부인 snapshot을 저장하는데도 사용됨

    Savepoint

    • 모든 프로그램은 savepoint에서 실행 재개 가능
    • savepoint는 수동적으로 트리거되어서 생성되는 checkpoint

    'Flink > Concepts' 카테고리의 다른 글

    Overview  (0) 2024.04.21
Designed by Tistory.