Post

RxJS (feat.반응형 프로그래밍)

RxJs

개요

RxJs의 용도는?

1
2
Think of RxJS as Lodash for events.
(RxJs를 이벤트용 Lodash로 생각하라)

처음에 RxJs의 공식 문서를 살펴보면 이게 정확히 무슨 용도로 쓰이고 어떤 효용성이 있는지 파악하기 힘들다.

작성자도 같은 생각이었는지 위와 같이 설명한다.

내가 생각하는 RxJs의 용도는 간단히 말하면 비동기 코드와 시간 관련된 코드를 쉽고 간결하게 작성하게 도와주는 것 이다.

만약에 아래조건으로 자동완성을 처리를 해야된다고 생각해보자

  1. 타이핑 할 때마다 서버에 데이터를 받아서 보여주세요
  2. 너무 잦은 요청은 서버에 부하를 줄 수 있으니 타이핑 간격이 좁으면 대기하다가 입력이 늦어지면 그 때 요청해주세요
  3. 같은 내용일 때는 요청하지 마세요
  4. 일정 시간 동안 응답이 없으면 3회 재시도하고 그래도 응답이 없으면 에러메시지를 출력해주세요
  5. 데이터는 캐시로 보관하여 먼저 보여주고 요청이 완료되면 변경된 데이터를 표시해주세요

위와 같은 조건을 Async/Await 혹은 Promise를 이용한다고 생각해보자 굉장히 복잡하고 어려운 코드를 만들게 될거다.

RxJS를 이용했을 때 코드는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let autoCompleteRef;
const inputElement = document.createElement("input");
const request = (data: any) => fromFetch("url");
const input$ = fromEvent < HTMLInputElement > (inputElement, "input"); // Event -> Observable

input$
  .pipe(
    map((event) => event.value),
    distinctUntilChanged(), // 3 - 중복되는 요청 막기
    debounceTime(500), // 2 - 연속되는 타이핑 제어
    mergeMap((text) =>
      request(text).pipe(
        map((res) => res.text),
        tap((value) => (autoCompleteRef = value)), // autoCompleteElement 처리
        timeout(7000), // 4 - 7초동안 서버의 응답이 없으면 timeout 처리
        retry(3), // 3번 재시도
        catchError((e) => e)
      )
    )
  )
  .subscribe();

코드를 보면 떠오르는게 있다! 마치 Array가 떠오르지 않나? 그래서 많은 사람들이 rxjs를 비동기 처리를 Array처럼 처리 할 수 있게 해주는 라이브러리라고 칭하기도 한다.

저 처리들을 모두 async/await 혹은 Promise로 처리한다고 생각해보자 저거보다 코드를 더 작성해야할 것이고 가독성도 좋지 않을 것이고, subscribe와 같은 observable 처리는 또 따로 해줘야 할 것이다. 물론 부가적인 기능이 더 있을 수 있지만 주로 이런 처리를 위해 RxJs를 사용한다.

EveryThing is a Stream

Stream이란 말은 프론트엔드 개발자에게 굉장히 친숙하다. stream은 실시간으로 계속 변경되는 데이터를 Chunk 단위로 받아 처리하는 것을 뜻한다. 예를 들면 Netflix와 같은 OTT 동영상 데이터를 받아 처리하는 것이나 화면 상에 움직이는 마우스 좌표 사용자가 입력하는 키보드 값 등 거의 모든 것을 Stream으로 표현할 수 있다.

반응형 프로그래밍 (Pull, Push)

반응형 프로그래밍은 생각보다 우리 주변에서 쉽게 접할 수 있다.

가장 쉬운 예로는 Excel이 있다! 가령 A1+B1 이라는 수식을 C1에 적어둔다면 A1과 B1이 변경될 때마다 C1이 알아서 반응하여 숫자를 변경한다.

그리고 Javascript에서 addEventListener 또한 반응형이다! event를 등록하고, event가 발생할 때 반응하여 여러 작용을 하는 방식이다.

반응형 프로그래밍은 Push 방식의 데이터 패러다임을 가지고 있다.

Pull

Pull은 Promise가 제일 대표적이다. 사용자가 필요할 때마다 데이터를 요청해서 사용하는 방식이다. 레거시 데이터나 중복 요청이 있을 수 있다.

Push

Push는 Subscribe된 값이 변경될 때마다 전파된 데이터를 받아 항상 최신의 데이터를 유지할 수 있다.

Observable (생성자)

Observable은 RxJS에서 가장 중심이 되는 역할을 한다. observer 을 인자로 받고 함수를 반환하며, Subscribe할 수 있는 역할이다.

Observable은 Promise와 같이 하나의 객체로 생각해야되며, Observable은 기본적으로 일반 function과 같이 동기적으로 작동한다.

하지만 함수와 다른점은 반환 에 있다.

비동기 적 반환과, 동기식으로 여러 개의 값을 반환할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 일반 함수
function foo() {
  return 42;
  return 100; // 죽은 코드이며 실행할 수 없는 코드다
}

import { Observable } from "rxjs";
// Observable

const foo = new Observable((subscribe) => {
  console.log("test");
  subscribe.next("1"); // 이런식으로 여러 값을 리턴할 수 있다.
  subscribe.next("10");
  subscribe.next("10");
  interval(3000, () => {
    subscribe.next("result"); // 비동기 반환
  });
});

foo.subscribe((x) => {
  console.log(x);
});

output

1
2
3
4
5
6
"test"
"1"
"10"
"10"
3  ..
"result"

subscribe

생성된 observable에 대한 subscribe 를 정의 할 수 있고, 전달될 데이터의 콜백과 다를게 없다.

Subscription 이라는 객체를 반환하며 이 객체는 오직 unsubscribe를 처리하는 용도로만 사용된다.

Observable 실행

next

실제 데이터를 Subscription에게 전달하는 함수.

Error

예외가 발생했을 때 Subscription에게 전달하는 함수 Observable Execution 동안 한 번의 실행만 가능

Complete

실행이 다 완료되었을 때 전달하는 함수 Error와 마찬가지로 한 번의 실행만 가능하며 둘 중에 하나만 있을 수 있다.

Observable 실행 중지

observable.subscribe() 실행시 Subscription이 반환된다. 그 반환된 객체로 unsubscribe 처리를 해주면 된다.

1
subscription.unsubscribe();

Observer (소비자)

Observer은 Observable에서 전달되는 값을 유형별로 처리하는 콜백 세트이다.

next, error, complete 과 같이 Observable에서 실행하는 함수와 같다.

Operator (연산자)

Operator는 함수를 실행하고 Observable을 반환하는 함수이다.

연산자는 두 가지 종류가 있다.

Pipe 가능한 연산자

해당 연산자는 Observable에 파이프할 수 있는 종류이다. 여기에는 filter, mergeMap이 포함된다. 그리고 구독 로직이 첫 번 째 Observable을 기반으로하는 새로운 Observable을 반환한다.

Pipe

Pipe는 Rxjs 5.5부터 도입된 개념이고, RxJs의 범위가 커지면서 Method방식은 Tree-Shaking에 불리하여 Pipe같은 Operator 함수로 분리하면 import를 한 만큼만 번들링이 되기 때문에 번들링의 크기를 줄일 수 있게 된다.

만약에 클릭이 3번 이상 일어났을 때 어떠한 동작을 한다고 생각하면 Dot Chain 방식에서는 아래와같은 코드를 사용할 수 있다.

1
2
3
import { Observable } from 'rxjs'

const obs = fromEvent(window,"click").bufferTime(250).filter(click => click.length === 3).subscribe(~~)

Pipe 연산자로 그대로 옮긴 다면 다음과 같다.

1
2
3
4
5
6
7
8
const pipe$ = fromEvent(window, "click")
  .pipe(
    bufferTime(2500),
    filter((click) => click.length >= 3)
  )
  .subscribe((value) => {
    console.log(`value:${value}`);
  });

Create Operator (생선 연산자)

미리 정의된 몇 가지 일반적인 동작을 사용하거나 다른 Observable을 결합하여 Observable을 생성하는데 사용할 수 있는 함수들이다.

예를 들어 Interval이 있습니다.

1
2
// Observable 생성
const observable = interval(1000);

더 많은 것들은 공식문서를… Event도 있고 데이터끼리 join을 해서 Observable을 생성하는 것도 있습니다.

Cold Observable

Observable 내에서 데이터를 생산하는 경우를 일컫는 말이다.

Observable은 기본적으로 무언가가 구독할 때만 값을 실행하게 되는데 각 구독자에 대해서 새로운 실행을 시작하므로 데이터가 공유 되지 안는다.

코드로 예를 든다면 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import * as Rx from "rxjs"

const observable = of(0,1).pipe(
	next(Math.random());
)

observable.subscribe((dt) => {
	console.log(dt); // 0.235314123324
});

observable.subscribe((dt) => {
	console.log(dt); // 0.712341231124
});

구독자에 대해서 새로운 실행을 수행하기 때문에 구독자가 받는 데이터는 다르게 되는 동작이다.

Cold 방식은 애니메이션이나 계속해서 함수가 생성되어 변화를 감지해야하는 것들이 어울린다.

Hot Observable

Hot 방식은 Cold와는 반대로 Observable 외부에서 데이터를 받는 방식이다.

코드로 예를 든다면 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import * as Rx from "rxjs"

const random = Math.random();
const observable = of(0,1).pipe(
	next(random);
)

observable.subscribe((dt) => {
	console.log(dt); // 0.235314123324
});

observable.subscribe((dt) => {
	console.log(dt); // 0.235314123324
});

구독자가 있든 없든 데이터가 생성되기때문에 구독자가 없다면 쓸 때없는 데이터에 메모리를 사용하는 경우가 될 수 있다.

Hot 방식은 API를 호출하거나 상태관리를 할 때 어울리는 방식이다.

SubJect

Subject는 Observable과 Observer 역할 모두 수행할 수 있으며, subscribe할 수 있으며 동시에 pipe도 가능하다.

Subject로 Hot Observable을 사용할 수 있다.

Subject의 기능은 여러 Observer에 멀티캐스트할 수 있다.

코드로 예를 든다면 아래와 같다.

1
2
3
4
5
const subject = new Subject();
subject.subscribe((val) => console.log(val)); // 0.123123
subject.subscribe((val) => console.log(val)); // 0.123123

subject.next(Math.random());

UseObservable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { useEffect, useMemo, useRef, useState } from "react";
import { BehaviorSubject, combineLatest, from, map } from "rxjs";

const useObservedValue = (observable: any) => {
  const [value, setValue] = useState();

  const subject = useRef(new BehaviorSubject(observable));

  useEffect(() => {
    subject.current.next(value);

    return () => {
      subject.current.unsubscribe();
    };
  }, [value]);

  return useMemo(() => subject.current.asObservable(), [subject]);
};

const test = () => {
  const [title, setTitle] = useState<string>("");
  const myObservaed = useObservedValue("value");

  myObservaed.subscribe((value) => {
    console.log(value);
  });
};

API 호출 시 유용한 함수

Promise 값이나 iterator을 가진 값은 From 메소드로 Observable 형식으로 변환시킬 수 있다.

ForkJoin

ForkJoin에 모든 Observable이 완료되는 것을 기다린 후 해당 Observable의 마지막값들을 return한다.

Subject에서 여러 API를 호출한 뒤에 하나의 결과로 모아볼 수 있는 API이다.

예제 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { forkJoin, of, timer } from "rxjs";

const observable = forkJoin({
  foo: of(1, 2, 3, 4),
  bar: Promise.resolve(8),
  baz: timer(4000),
});

observable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log("This is how it ends!"),
});

// Logs:
// { foo: 4, bar: 8, baz: 0 } after 4 seconds
// 'This is how it ends!' immediately after

MergeMap

ForkJoin과는 다르게 Observable 중 하나가 완료될 때 마다 ID를 사용하고 반응이 가능한 API

예제코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { of, mergeMap, interval, map } from "rxjs";

const letters = of("a", "b", "c");
const result = letters.pipe(
  mergeMap((x) => interval(1000).pipe(map((i) => x + i)))
);

result.subscribe((x) => console.log(x));

// Logs:
// a1
// b1
// c1
// a2
// b2
// c2
// a3
// b3
// c3

CombineLatest

Observables 배열에 전달된 모든 Observables의 값을 결합한다.

@akanass/rx-http-request

API를 호출하여 바로 Observable을 반환하여 Subscribe에게 전달할 수 있는 라이브러리이다.

This post is licensed under CC BY 4.0 by the author.