-
DataStream API IntroFlink/Learn Flink 2024. 4. 7. 14:31
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/learn-flink/datastream_api/
다음 공식 문서를 읽고 필요 내용을 정리, Flink 1.17 버전 기준으로 정리했습니다.
1. What can be Streamed?
- 직렬화가 가능한 것들은 스트림을 할수 있음
- Flink는 다음과 같은 직렬화 도구들을 가지고 있음
- basic types, i.e., String, Long, Integer, Boolean, Array
- composite types: Tuples, POJOs, and Scala case classes
- 다음 두가지 경우를 말고는 Kyro 라이브러리 사용
- 기본 설정 직렬화 도구 이외 다른 직렬화 도구 사용 가능
- 특히 Avro는 지원이 잘됨
Java tuples and POJOs
- 자바 직렬화 도구는 tuple과 POJO에서 효율적으로 작동이 됨
- tuple
- flink는 tuple 0 ~ tuple25가 정의되어 있음
Tuple2<String, Integer> person = Tuple2.of("Fred", 35); // zero based index! String name = person.f0; Integer age = person.f1;
- POJOs
- Flink는 다음 조건들이 충족되면 데이터 유형을 POJO 유형으로 인식
- public 클래스이고 standalone(non-static inner class가 없는 경우)
- public class 이고 no-argument 생성자가 있는 경우
- 클래스 안 non-static, non-transient 필드들이 public (상속된 모든 클래스들 포함) 이고 getter-setter 메소드들이 Java beans naming conventions을 지키는 경우
- Flink 직렬화는 POJO Type에 Schema Evolution을 지원
- Flink는 다음 조건들이 충족되면 데이터 유형을 POJO 유형으로 인식
// POJO Class Example public class Person { public String name; public Integer age; public Person() {} public Person(String name, Integer age) { . . . } } Person person = new Person("Fred Flintstone", 35);
2. A Complete Example
Stream execution environment
- Flink는 execution enviroment 설정이 필요
- Streaming application은 StreamExecutionEnvironment를 사용해야 함
- DataStream API call 은 job graph에 만들어지고 job graph는 StreamExecutionEnvironment에 있음
- env.execute()이 호출되면 graph가 패키징되고 잡 매니저로 보내짐
- 잡 매니저는 잡을 병렬화 하고 taskManager로 잡을 분산시킴
- 잡의 병렬화 한 부분부분은 task slot이라고 함
보통의 스트림의 경우 Source (데이터를 가져옴) -> 처리 -> Sink (데이터를 보낸) 처리 순으로 진행이 됨
code example
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.api.common.functions.FilterFunction; public class Example { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 데이터 source (다른 곳에서 데이터를 가져오지 않고 만드는 예제) DataStream<Person> flintstones = env.fromElements( new Person("Fred", 35), new Person("Wilma", 35), new Person("Pebbles", 2)); // 데이터 처리 (processing) DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() { @Override public boolean filter(Person person) throws Exception { return person.age >= 18; } }); // 데이터 sink (다른 곳으로 데이터를 보내지 않고 단순히 print함) adults.print(); env.execute(); } public static class Person { public String name; public Integer age; public Person() {} public Person(String name, Integer age) { this.name = name; this.age = age; } public String toString() { return this.name.toString() + ": age " + this.age.toString(); } } }
Basic Stream Source
- StreamExecutionEnvironment의 fromCollection(Collection) 메소드 사용 예시
List<Person> people = new ArrayList<Person>(); people.add(new Person("Fred", 35)); people.add(new Person("Wilma", 35)); people.add(new Person("Pebbles", 2)); DataStream<Person> flintstones = env.fromCollection(people);
- socket이나 file로도 DataStream을 만들 수 있음
DataStream<String> lines = env.socketTextStream("localhost", 9999); DataStream<String> lines = env.readTextFile("file:///path");
- 보통 data source는 Apache Kafka, Kinesis, 다양한 파일 시스템을 사용하고(low-latency, high throughput 처리를 하기 위해 분산처리에 유용) data 정보를 enrichment를 하기 위해 rest api나 database를 사용하기도 함
Basic Stream Sink
- 위의 code example에서는 데이터를 print함
- production에서는 보통 File적재 (FileSink), 다양한 데이터베이스 시스템, 몇몇 pub-sub 시스템으로 Sink 하는 식으로 보통 구현
참고자료
flink training repo - https://github.com/apache/flink-training/tree/release-1.17/
'Flink > Learn Flink' 카테고리의 다른 글
Data Pipelines & ETL (0) 2024.04.15 OverView (0) 2024.03.31