ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Data Pipelines & ETL
    Flink/Learn Flink 2024. 4. 15. 20:55

    https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/learn-flink/etl/

     

    Data Pipelines & ETL

    Data Pipelines & ETL # One very common use case for Apache Flink is to implement ETL (extract, transform, load) pipelines that take data from one or more sources, perform some transformations and/or enrichments, and then store the results somewhere. In thi

    nightlies.apache.org

    다음 공식 문서를 읽고 필요 내용을 정리, Flink 1.17 버전 기준으로 정리했습니다.

     

    • Flink의 흔한 용도는 ETL(extract, transform, load) 파이프라인 구현을 하기 위해서 사용된다. 
      • 다양한 source에서 데이터를 가져와서 변형하거나 Data enrichment를 수행
    • ETL 용도로 Flink 사용시 Table이나 SQL API 사용시 적합

    1. Stateless Transformations

    flink training repo 기반으로 예시들이 설명 - https://github.com/apache/flink-training/tree/release-1.17/

     

    GitHub - apache/flink-training: Apache Flink Training Excercises

    Apache Flink Training Excercises. Contribute to apache/flink-training development by creating an account on GitHub.

    github.com

    map()

    example

    GeoUtils 클래스를 이용해서 정보를 Enrich (위도, 경도 정보를 이용해서 Grid 셀에 매핑)

    public static class EnrichedRide extends TaxiRide {
        public int startCell;
        public int endCell;
    
        public EnrichedRide() {}
    
        public EnrichedRide(TaxiRide ride) {
            this.rideId = ride.rideId;
            this.isStart = ride.isStart;
            ...
            this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
            this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
        }
    
        public String toString() {
            return super.toString() + "," +
                Integer.toString(this.startCell) + "," +
                Integer.toString(this.endCell);
        }
    }
    DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
    
    DataStream<EnrichedRide> enrichedNYCRides = rides
        .filter(new RideCleansingSolution.NYCFilter())
        .map(new Enrichment());
    
    enrichedNYCRides.print();
    public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
    
        @Override
        public EnrichedRide map(TaxiRide taxiRide) throws Exception {
            return new EnrichedRide(taxiRide);
        }
    }

     

    flatmap()

    • map function은 one-to-one transformation 수행
    • flatmap은 one-to-many transformation 수행
    DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
    
    DataStream<EnrichedRide> enrichedNYCRides = rides
        .flatMap(new NYCEnrichment());
    
    enrichedNYCRides.print();
    public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
    
        @Override
        public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
            FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
            if (valid.filter(taxiRide)) {
                out.collect(new EnrichedRide(taxiRide));
            }
        }
    }

     

    2. Keyed Streams

    keyBy()

    • attributes로 파티션을 나눌 때 유용한 점 존재
    • example
      • 정해진 start 범위에서  가장 길게 택시를 탑승한 내역을 구하고자 할때 KeyBy로 startCell로 repartition해두면 start 범위내 데이터들을 정렬된 상태로 처리가 가능하다. (GROUP BY의 기능을 수행)
    rides
        .flatMap(new NYCEnrichment())
        .keyBy(enrichedRide -> enrichedRide.startCell);

    • keyBy는 네트워크 부하가 있는데 파티션간 shuffle이 일어나면서 직렬화와 역직렬화가 일어나기 때문

    Keys are computed 

    • KeySelector를 지정하는데 제한은 없고 hashCode() 메소드와 equals() 메소드는 구현이 되어 있어야 한다.
      • Enum, Array, random number 생성을 하는 것은 배제를 하지만 Tuple이나 POJO를 사용하여 복합 키를 만들어 낼 수 있음

    Aggregations on Keyed Streams

    • 다음 코드는 startCellrhk duration을 가지는 새로운 스트림을 만들어 냄

    import org.joda.time.Interval;
    
    DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
        .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
    
            @Override
            public void flatMap(EnrichedRide ride,
                                Collector<Tuple2<Integer, Minutes>> out) throws Exception {
                if (!ride.isStart) {
                    Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                    Minutes duration = rideInterval.toDuration().toStandardMinutes();
                    out.collect(new Tuple2<>(ride.startCell, duration));
                }
            }
        });
    • 위에서 만들어진 새로운 스트림을 이용하면 각각의 startCell마다 가장 많이 택시 탑승한 이력들을 뽑아낼 수 있음
    • 다음 코드로 keyBy를 이용해서 startCell마다 그룹을 지어주고 maxBy로 group마다 가장 큰 duration이력을 print할 수 있음
    minutesByStartCell
      .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
      .maxBy(1) // duration
      .print();
    ...
    4> (64549,5M)
    4> (46298,18M)
    1> (51549,14M)
    1> (53043,13M)
    1> (56031,22M)
    1> (50797,6M)
    ...
    1> (50797,8M)
    ...
    1> (50797,11M)
    ...
    1> (50797,12M)
    • aggregation 할 때 주의사항
      • flink application에서 state 사용을 할 시 unbounded 데이터를 처리 할 때 잘못하면 state 양이 무제한으로 많아질 수 있어서 전체 스트림에 대한 정보를 저장하기 보다는 window 형식으로 처리하는 것을 권장함
    • 위의 aggregation 메소드는 maxBy() 메소드를 예시로했지만  reduce() 메소드 또한 aggregation 메소드 예시이다.

    3.Stateful Transformations

    Why is Flink Involved in Managing State?

    Flink는 내부에서 관리하는 State 이외에 직접 사용하는 것을 권장한다.

    • local: local machine에 유지하기 때문에 memory 접근 속도로 state에 접근이 가능하다.
    • durable: fault-tolerant 기능을 가지고 있음. checkpoint로 일정한 interval마다 저장이 되고 실패시 복구 때 다시 사용이 가능
    • vertically scalable: 내장된 RocksDB를 사용하면서 이는 scale out이 가능함
    • horizontally scalable: Flink cluster가 크거나 줄어들때 state 재분배가 일어남
    • queryable: Flink state에서는 외부에서 쿼리로 조회가 가능하다. (Queryable State API 사용)

    Rich Functions

    FilterFunction, MapFunction, FlatmapFunction: Single Abstract Method pattern 예시

    이러한 인터페이스들에서 Flinksms "rich" 변형된 인터페이스들을 제공하는데 다음과 같은 추상적인 메소드들을 제공

    • open(Configuration c)
      • Initialize 할때 호출됨
      • static data를 load 하거나 외부 서비스 connection 할때 등으로 사용됨
    • close()
    • getRunctimeContext()
      • Flink에서 관리하는 state를 만들고 접근할 수 있음

    An Example with Keyed State

    RichFlatMapFunction를 상속받는 Deduplicator를 구현해서 de-dup행위를 하는 코드 example

    private static class Event {
        public final String key;
        public final long timestamp;
        ...
    }
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
        env.addSource(new EventSource())
            .keyBy(e -> e.key)
            .flatMap(new Deduplicator())
            .print();
      
        env.execute();
    }
    public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
        ValueState<Boolean> keyHasBeenSeen;
    
        @Override
        public void open(Configuration conf) {
            ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
            keyHasBeenSeen = getRuntimeContext().getState(desc);
        }
    
        @Override
        public void flatMap(Event event, Collector<Event> out) throws Exception {
            if (keyHasBeenSeen.value() == null) {
                out.collect(event);
                keyHasBeenSeen.update(true);
            }
        }
    }

    keyBy로 key마다 데이터 셋을 분배하고 key에 대한 데이터가 유입 되었다는 것을 state에 저장한 다음 유입이 된 이력이 있으면 더이상 데이터를 보내지 않음

     

     

    'Flink > Learn Flink' 카테고리의 다른 글

    DataStream API Intro  (0) 2024.04.07
    OverView  (0) 2024.03.31
Designed by Tistory.