ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • DataStream API Intro
    Flink/Learn Flink 2024. 4. 7. 14:31

    https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/learn-flink/datastream_api/

     

    Intro to the DataStream API

    Intro to the DataStream API # The focus of this training is to broadly cover the DataStream API well enough that you will be able to get started writing streaming applications. What can be Streamed? # Flink’s DataStream APIs will let you stream anything

    nightlies.apache.org

    다음 공식 문서를 읽고 필요 내용을 정리, Flink 1.17 버전 기준으로 정리했습니다.

     

    1. What can be Streamed?

    • 직렬화가 가능한 것들은 스트림을 할수 있음
    • Flink는 다음과 같은 직렬화 도구들을 가지고 있음
      • basic types, i.e., String, Long, Integer, Boolean, Array
      • composite types: Tuples, POJOs, and Scala case classes
      • 다음 두가지 경우를 말고는 Kyro 라이브러리 사용
    • 기본 설정 직렬화 도구 이외 다른 직렬화 도구 사용 가능
      • 특히 Avro는 지원이 잘됨

    Java tuples and POJOs

    • 자바 직렬화 도구는 tuple과 POJO에서 효율적으로 작동이 됨 
    • tuple
      • flink는 tuple 0 ~ tuple25가 정의되어 있음
    Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
    
    // zero based index!  
    String name = person.f0;
    Integer age = person.f1;
    • POJOs
      • Flink는 다음 조건들이 충족되면 데이터 유형을 POJO 유형으로 인식
        • public 클래스이고 standalone(non-static inner class가 없는 경우)
        • public class 이고 no-argument 생성자가 있는 경우
        • 클래스 안 non-static, non-transient 필드들이 public (상속된 모든 클래스들 포함) 이고 getter-setter 메소드들이 Java beans naming conventions을 지키는 경우
      • Flink 직렬화는 POJO Type에 Schema Evolution을 지원
    // POJO Class Example
    public class Person {
        public String name;  
        public Integer age;  
        public Person() {}
        public Person(String name, Integer age) {  
            . . .
        }
    }  
    
    Person person = new Person("Fred Flintstone", 35);

    2.  A Complete Example

    Stream execution environment

    • Flink는 execution enviroment 설정이 필요
    • Streaming application은 StreamExecutionEnvironment를 사용해야 함
    •  DataStream API call 은 job graph에 만들어지고 job graph는 StreamExecutionEnvironment에 있음
      • env.execute()이 호출되면 graph가 패키징되고 잡 매니저로 보내짐
      • 잡 매니저는 잡을 병렬화 하고 taskManager로 잡을 분산시킴
      • 잡의 병렬화 한 부분부분은 task slot이라고 함

    보통의 스트림의 경우 Source (데이터를 가져옴) -> 처리 -> Sink (데이터를 보낸) 처리 순으로 진행이 됨

    code example

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.api.common.functions.FilterFunction;
    
    public class Example {
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
    		
            // 데이터 source (다른 곳에서 데이터를 가져오지 않고 만드는 예제)
            DataStream<Person> flintstones = env.fromElements(
                    new Person("Fred", 35),
                    new Person("Wilma", 35),
                    new Person("Pebbles", 2));
            
            // 데이터 처리 (processing)
            DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
                @Override
                public boolean filter(Person person) throws Exception {
                    return person.age >= 18;
                }
            });
    
            // 데이터 sink (다른 곳으로 데이터를 보내지 않고 단순히 print함)
            adults.print();
    
            env.execute();
        }
    
        public static class Person {
            public String name;
            public Integer age;
            public Person() {}
    
            public Person(String name, Integer age) {
                this.name = name;
                this.age = age;
            }
    
            public String toString() {
                return this.name.toString() + ": age " + this.age.toString();
            }
        }
    }

    Basic Stream Source

    • StreamExecutionEnvironment의 fromCollection(Collection) 메소드 사용 예시
    List<Person> people = new ArrayList<Person>();
    
    people.add(new Person("Fred", 35));
    people.add(new Person("Wilma", 35));
    people.add(new Person("Pebbles", 2));
    
    DataStream<Person> flintstones = env.fromCollection(people);
    • socket이나 file로도 DataStream을 만들 수 있음
    DataStream<String> lines = env.socketTextStream("localhost", 9999);
    
    DataStream<String> lines = env.readTextFile("file:///path");
    • 보통 data source는 Apache Kafka, Kinesis, 다양한 파일 시스템을 사용하고(low-latency, high throughput 처리를 하기 위해 분산처리에 유용) data 정보를 enrichment를 하기 위해 rest api나 database를 사용하기도 함

    Basic Stream Sink

    • 위의 code example에서는 데이터를 print함
    • production에서는 보통 File적재 (FileSink), 다양한 데이터베이스 시스템, 몇몇 pub-sub 시스템으로 Sink 하는 식으로 보통 구현

    참고자료

    flink training repo - https://github.com/apache/flink-training/tree/release-1.17/

    'Flink > Learn Flink' 카테고리의 다른 글

    Data Pipelines & ETL  (0) 2024.04.15
    OverView  (0) 2024.03.31
Designed by Tistory.