-
Data Pipelines & ETLFlink/Learn Flink 2024. 4. 15. 20:55
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/learn-flink/etl/
다음 공식 문서를 읽고 필요 내용을 정리, Flink 1.17 버전 기준으로 정리했습니다.
- Flink의 흔한 용도는 ETL(extract, transform, load) 파이프라인 구현을 하기 위해서 사용된다.
- 다양한 source에서 데이터를 가져와서 변형하거나 Data enrichment를 수행
- ETL 용도로 Flink 사용시 Table이나 SQL API 사용시 적합
1. Stateless Transformations
flink training repo 기반으로 예시들이 설명 - https://github.com/apache/flink-training/tree/release-1.17/
map()
example
GeoUtils 클래스를 이용해서 정보를 Enrich (위도, 경도 정보를 이용해서 Grid 셀에 매핑)
public static class EnrichedRide extends TaxiRide { public int startCell; public int endCell; public EnrichedRide() {} public EnrichedRide(TaxiRide ride) { this.rideId = ride.rideId; this.isStart = ride.isStart; ... this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat); this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat); } public String toString() { return super.toString() + "," + Integer.toString(this.startCell) + "," + Integer.toString(this.endCell); } }
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...)); DataStream<EnrichedRide> enrichedNYCRides = rides .filter(new RideCleansingSolution.NYCFilter()) .map(new Enrichment()); enrichedNYCRides.print();
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> { @Override public EnrichedRide map(TaxiRide taxiRide) throws Exception { return new EnrichedRide(taxiRide); } }
flatmap()
- map function은 one-to-one transformation 수행
- flatmap은 one-to-many transformation 수행
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...)); DataStream<EnrichedRide> enrichedNYCRides = rides .flatMap(new NYCEnrichment()); enrichedNYCRides.print();
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> { @Override public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception { FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter(); if (valid.filter(taxiRide)) { out.collect(new EnrichedRide(taxiRide)); } } }
2. Keyed Streams
keyBy()
- attributes로 파티션을 나눌 때 유용한 점 존재
- example
- 정해진 start 범위에서 가장 길게 택시를 탑승한 내역을 구하고자 할때 KeyBy로 startCell로 repartition해두면 start 범위내 데이터들을 정렬된 상태로 처리가 가능하다. (GROUP BY의 기능을 수행)
rides .flatMap(new NYCEnrichment()) .keyBy(enrichedRide -> enrichedRide.startCell);
- keyBy는 네트워크 부하가 있는데 파티션간 shuffle이 일어나면서 직렬화와 역직렬화가 일어나기 때문
Keys are computed
- KeySelector를 지정하는데 제한은 없고 hashCode() 메소드와 equals() 메소드는 구현이 되어 있어야 한다.
- Enum, Array, random number 생성을 하는 것은 배제를 하지만 Tuple이나 POJO를 사용하여 복합 키를 만들어 낼 수 있음
Aggregations on Keyed Streams
- 다음 코드는 startCellrhk duration을 가지는 새로운 스트림을 만들어 냄
import org.joda.time.Interval; DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() { @Override public void flatMap(EnrichedRide ride, Collector<Tuple2<Integer, Minutes>> out) throws Exception { if (!ride.isStart) { Interval rideInterval = new Interval(ride.startTime, ride.endTime); Minutes duration = rideInterval.toDuration().toStandardMinutes(); out.collect(new Tuple2<>(ride.startCell, duration)); } } });
- 위에서 만들어진 새로운 스트림을 이용하면 각각의 startCell마다 가장 많이 택시 탑승한 이력들을 뽑아낼 수 있음
- 다음 코드로 keyBy를 이용해서 startCell마다 그룹을 지어주고 maxBy로 group마다 가장 큰 duration이력을 print할 수 있음
minutesByStartCell .keyBy(value -> value.f0) // .keyBy(value -> value.startCell) .maxBy(1) // duration .print();
... 4> (64549,5M) 4> (46298,18M) 1> (51549,14M) 1> (53043,13M) 1> (56031,22M) 1> (50797,6M) ... 1> (50797,8M) ... 1> (50797,11M) ... 1> (50797,12M)
- aggregation 할 때 주의사항
- flink application에서 state 사용을 할 시 unbounded 데이터를 처리 할 때 잘못하면 state 양이 무제한으로 많아질 수 있어서 전체 스트림에 대한 정보를 저장하기 보다는 window 형식으로 처리하는 것을 권장함
- 위의 aggregation 메소드는 maxBy() 메소드를 예시로했지만 reduce() 메소드 또한 aggregation 메소드 예시이다.
3.Stateful Transformations
Why is Flink Involved in Managing State?
Flink는 내부에서 관리하는 State 이외에 직접 사용하는 것을 권장한다.
- local: local machine에 유지하기 때문에 memory 접근 속도로 state에 접근이 가능하다.
- durable: fault-tolerant 기능을 가지고 있음. checkpoint로 일정한 interval마다 저장이 되고 실패시 복구 때 다시 사용이 가능
- vertically scalable: 내장된 RocksDB를 사용하면서 이는 scale out이 가능함
- horizontally scalable: Flink cluster가 크거나 줄어들때 state 재분배가 일어남
- queryable: Flink state에서는 외부에서 쿼리로 조회가 가능하다. (Queryable State API 사용)
Rich Functions
FilterFunction, MapFunction, FlatmapFunction: Single Abstract Method pattern 예시
이러한 인터페이스들에서 Flinksms "rich" 변형된 인터페이스들을 제공하는데 다음과 같은 추상적인 메소드들을 제공
- open(Configuration c)
- Initialize 할때 호출됨
- static data를 load 하거나 외부 서비스 connection 할때 등으로 사용됨
- close()
- getRunctimeContext()
- Flink에서 관리하는 state를 만들고 접근할 수 있음
An Example with Keyed State
RichFlatMapFunction를 상속받는 Deduplicator를 구현해서 de-dup행위를 하는 코드 example
private static class Event { public final String key; public final long timestamp; ... } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new EventSource()) .keyBy(e -> e.key) .flatMap(new Deduplicator()) .print(); env.execute(); }
public static class Deduplicator extends RichFlatMapFunction<Event, Event> { ValueState<Boolean> keyHasBeenSeen; @Override public void open(Configuration conf) { ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN); keyHasBeenSeen = getRuntimeContext().getState(desc); } @Override public void flatMap(Event event, Collector<Event> out) throws Exception { if (keyHasBeenSeen.value() == null) { out.collect(event); keyHasBeenSeen.update(true); } } }
keyBy로 key마다 데이터 셋을 분배하고 key에 대한 데이터가 유입 되었다는 것을 state에 저장한 다음 유입이 된 이력이 있으면 더이상 데이터를 보내지 않음
'Flink > Learn Flink' 카테고리의 다른 글
DataStream API Intro (0) 2024.04.07 OverView (0) 2024.03.31 - Flink의 흔한 용도는 ETL(extract, transform, load) 파이프라인 구현을 하기 위해서 사용된다.