최근 빅데이터 플랫폼에서 인메모리 방식의 Spark의 활용도가 점점 높아지고 있습니다. Spark은 분산 환경에서 동작하기 때문에 일반 프로그래밍과 다른 측면이 있습니다. Spark을 사용하기에 앞서 어떤 내용을 이해해야 하는지 알아봅시다.


1. Spark란?

Spark는 독립적으로 홀로 존재하지 않는다. Spark와 상호작용하는 빅데이터 기술들을 이해하기 위해 하둡 에코시스템을 먼저 살펴보자. 크게 데이터 전송, 분산 파일 시스템, 분산 데이터 처리, 운영 관리 레이어로 구분된다. 각 레이어마다 데이터 타입, 저장 방식 등에 따라 다양한 기술이 있다. 참고로 Sqoop 시작하기 글에서 HDFS를 기준으로 데이터 저장/처리 흐름을 확인할 수 있다.

분산 컴퓨팅을 위해서 기본적으로 분산 파일 저장소와 분산 데이터 처리를 독립적으로 구분해서 관리한다. 하둡 1.0은 HDFS와 MapReduce만을 가지고, 하둡 2.0은 여기에 리소스 관리자인 YARN이 추가된다. 이 세 가지가 하둡 핵심 컴포넌트이고 이를 중심으로 대용량 데이터를 효과적으로 저장 및 처리하기 위해서 여러 레이어들이 등장했다. 레이어마다 각자 고유한 특징과 역할이 있다.

Spark은 Tez, MapReduce, PIG 등 다양한 분산 데이터 처리기 중 하나일 뿐이다. 분산 데이터 처리기는 분산 파일 시스템과 항상 짝을 이뤄야 한다. HDFS는 MapReduce 또는 Spark 또는 Tez와 짝이 될 수 있고, Spark은 HDFS 또는 Cassandra와 짝이될 수 있다.

Spark의 특징은 무엇일까? Spark은 MapReduce의 단점을 개선하기 위해 등장했다. MapReduce의 단점은 데이터 처리 단계 사이의 중간 데이터가 디스크에 잔류하는 것이다. 반면, Spark은 중간 데이터를 메모리에 둘 수 있도록 하였다. 이는 데이터를 재사용할 수 있는 능력을 부여함으로써 반복형이나 대화형 테스크에 강점을 가진다. Spark의 실행 과정은 위 그림과 같다[3]. MapReduce는 위 그림에서 메모리 셀이 없다고 보면 된다.


2. 왜 Spark?

어떤 문제에 Spark을 활용하려면 왜 Spark이어야 하는지에 대한 명확한 이해가 필요하다. 항상 Spark가 베스트 솔루션은 아니기 때문이다.

2.1. Spark vs. Multiprocessing

분산 환경을 사용하는 가장 큰 이유는 유연한 ‘확장성‘에 있다(다양한 하둡 에코시스템을 통한 효율적인 빅데이터 저장/분석이 가능한 부분도 있음). 데이터가 정적으로 고정되어 있지 않고 지속적으로 증가한다고 생각해보자. 이를 대응하기 위해서 단일 서버는 내부의 하드웨어를 교체하는 등의 고비용이 요구되고, 이를 처리하기 위한 복잡한 저수준의 프로그래밍이 필요하다1. 분산 환경에서는 값싼 서버를 하나씩 늘리면 될 뿐이다.

병렬 연산을 꼭 분산 클러스터를 구성해서 해결해야 하는지를 먼저 따져봐야 한다. 단일 서버에서 많은 메모리와 멀티 코어 수와 함께 multiprocessing을 사용하면 웬만한 크기의 병렬은 해결할 수 있다. 분산 환경에서 실행 계획, 리소스 할당 등 테스크를 돌리는 과정이 있기에 단일 서버보다 성능이 저하되는 경우도 있다.

컴퓨팅 성능에서 가장 느린 부분은 디스크 입출력이다. 대용량 데이터일수록 디스크 입출력은 굉장히 느려진다. 분산 환경은 대용량 파일을 잘게 나눠서 저장하고 여러 곳에서 동시에 입출력할 수 있다. 즉 디스크 I/O를 병렬로 할 수 있는 것인데 이는 데이터가 많아질수록 유리한 구조이다.

이렇듯 문제를 잘 파악하고 단일 서버에서 multiprocessing으로 돌릴 것인지 분산 환경에서 spark으로 돌릴 것인지 결정해야 한다. 만일 데이터가 그렇게 크지 않고 고정되어 있다면 단일 서버도 고려해볼 필요가 있고, 데이터가 지금은 작더라도 확장성이 큰 경우는 Spark를 준비하는 것이 좋다.

2.2. Spark vs. MapReduce vs. Hive

어떤 분산 데이터 처리기를 사용해야 할까? 다양한 종류의 분산 데이터 처리기가 있는데 문제에 맞게 적합한 것을 선택해야 한다. 여기서는 Spark, MapReduce, Hive만 대략적으로 비교해보자. 참고로 Hive는 SQL 언어로 사용할 수 있는 쿼리 엔진이며 내부적으로는 하둡 클러스터로 돌아간다.

통계와 같은 단순한 데이터 처리 또는 데이터 조회는 Hive가 적합하다. 맵리듀스 프로그래밍 없이 SQL문으로 돌릴 수 있기 때문이다. 분산 환경에서 데이터가 parquet 포맷과 같이 읽기 불가능한 타입인데 Hive로 쉽게 볼 수 있다.

기계학습과 같은 반복적인 데이터 플로우대화형 작업을 위해서는 Spark이 좋다. 추상화된 API로 복잡하거나 독특한 분석을 쉽게 프로그래밍을 할 수 있다. Spark은 인메모리 연산으로 빠른 성능의 장점이 있지만, 메모리에 의존하는 만큼 리소스 할당 관련 설정을 잘하지 않으면 오류가 쉽게 발생한다.

MapReduce는 메모리를 사용하지 않기에 Spark 보다 느리지만 더 강한 Failure Tolerance를 가진다. 이에 시간은 조금 오래 걸리지만 안정적으로 수행해야 하는 테스크는 MapReduce가 더 적합하다.


3. Spark 시작하기

Spark를 시작하기 전에 숙지해야 될 사항을 알아보자.

3.1. Spark 프로그래밍 언어

  • 스칼라 (Scala)
  • 자바 (Java)
  • 파이썬 (Python)
  • R

Spark는 스칼라로 작성되고 JVM으로 돌아가지만, 다양한 언어로 작성할 수 있다. 단, Python으로 작성할 때 주의사항이 있다. 저수준 API인 RDD로 작성하거나 사용자 정의 함수를 사용할 때 성능 저하가 발생할 수 있다. 파이썬 데이터를 JVM이 이해할 수 있도록 변환하는 과정에서 큰 비용이 발생하기 때문이다. 고수준 API와 사용자 정의 함수를 제한적으로만 활용한다면 성능면에서 큰 차이는 발생하지 않을 것이다. 이러한 파이썬의 단점이 있지만 다른 언어보다 심플하게 작성할 수 있는 장점도 있다.

3.2. Spark 프로그래밍 API

  • 고수준 API
    • DataFrame
    • SQL
    • Dataset
  • 저수준 API
    • RDD

Spark은 크게 고수준과 저수준 API를 제공한다. 대표적으로 전자는 Dataframe, 후자는 RDD이다. Dataframe은 API가 간결하면서 쿼리 최적화를 자동으로 해주기 때문에 보통 RDD 보다 성능이 좋다고 한다. 그렇다고 RDD를 전혀 사용하지 않는 것은 아니다. Dataframe API에서 제공하지 않는 기능들이나 공유 변수를 다뤄야 하는 기능 등 물리적 데이터 배치를 세밀하게 제어해야 하는 상황에서 RDD가 필요하다.

3.3. Spark 사용자 정의 함수

Spark의 사용자 정의 함수를 통해 자신만의 데이터 처리 함수를 정의할 수 있다. 비용이 큰 함수이기에 최대한 내장 함수를 사용하는 것이 좋다. 위에서 언급한 바와 같이 파이썬 사용자 정의 함수는 많이 사용할 경우 성능 저하의 원인이 될 수 있다는 점을 인지하자. 다음과 같이 두 가지 방식으로 사용자 함수를 정의할 수 있다.

# [방법1]

import pyspark.sql.functions as F
def simple_tokenizer(in_string):
    in_string.split()
simple_tokenizer_udf = F.udf(lambda x: simple_tokenizer(x), ArrayType(StringType))


# [방법2]

import pyspark.sql.functions as F
@F.udf(returnType=ArrayType(StringType))
def simple_tokenizer_udf(in_string):
    in_string.split()

[방법2]처럼 데코레이터를 사용하면 간단하게 표현해 가독성을 키울 수 있다.

3.4. 분산 데이터 저장소

  • 온프레미스 분산 저장소
    • HDFS
    • HBase
  • 클라우드 분산 저장소
    • S3 (Amazon Web Services)
    • Blob Storage (Azure)

Spark는 인메모리 연산을 하지만 데이터 입출력을 위한 분산 데이터 저장소가 필요하다. 입출력 뿐만 아니라 연산 중에도 분산 저장소를 활용한다. 기본적으로 HDFS를 사용하지만 S3와 같은 클라우드 분산 저장소를 사용할 수 있다. 다음과 같이 단지 경로의 prefix만 바꾸면 된다.

path = "hdfs://data/path/is/here" # HDFS
path = "s3://data/path/is/here" # S3
df = spark.read.parquet(path)
...

3.5. 입출력 데이터 타입

  • parquet
  • csv, json, txt, …

Spark에서 데이터 타입은 csv, json, txt와 같은 일반 데이터 포맷을 사용할 수 있으나, parquet과 같은 컬럼 기반 포맷이 압축률과 처리 속도가 빠르므로 더 적합하다. 단, parquet은 읽을 수 없는 포맷이므로 데이터 조회를 곧바로 할 수 없다. 이런 경우는 보통 Hive에 table로 등록하기만 하면, SQL로 조회하거나 다음과 같은 CLI 명령어로 parquet 데이터를 쉽게 확인할 수 있다.

$ hive -e "select * from my_table.my_column limit 100"

3.6. Spark Job 실행 방법

  • 대화형 쉘(Interactive shell)
  • Spark-submit 콘솔 명령어
  • Jupyter notebook, Zeppelin notebook
  • Apache Livy (Restful API)
  • Amazon EMR 의 Step 기능

다양한 방식으로 Spark Job을 실행할 수 있다. 가장 기본적으로 다음과 같이 콘솔에서 spark-submit을 통해 job을 실행할 수 있다.

$ /usr/bin/spark-submit --conf ... --master yarn ... myspark_job.py

마스터노드나 게이트웨이 서버에서 콘솔로 ‘spark’ 혹은 ‘pyspark’을 실행하면 대화형 쉘을 통해서 job을 실행할 수 있다. 비슷하게 jupyter와 zeppelin 노트북을 통해 대화적으로 job을 실행할 수 있다. Apache Livy라는 RESTful API 서버를 통해서도 job을 실행할 수 있다. 또한, EMR의 step이라는 기능에서도 job을 실행할 수 있다.

이처럼 상황에 따라 다양한 방법으로 spark job을 실행할 수 있다. 주의할 점으로 대화형 쉘이나 노트북으로 실행할 경우 따로 spark config 설정하지 않으면 디폴트로 실행된다는 점을 인지하자.

3.7. Spark 실행 모드

  • 로컬 (Local)
  • 클라이언트 (Client)
  • 클러스터 (Cluster)

Spark의 실행 모드는 크게 세 가지가 있고, spark-submit의 deploy-mode 옵션으로 지정할 수 있다. 실행 모드는 job을 실행할 때 요청한 자원의 물리적인 위치를 결정한다. 로컬 모드는 단일 머신에서만 실행되고 병렬을 위해 스레드를 활용한다. 즉 드라이버, 마스터, 익스큐터 모두 단일 JVM에서 실행된다. 로컬이 모드는 주로 테스트용으로 사용된다.

클라이언트 모드는 job이 제출되는 서버에서 드라이버 프로세스가 생성되고 이를 매개체로 Spark 어플리케이션의 진행 과정을 모니터링할 수 있다. 그러나 드라이버와 익스큐터 사이의 지연 시간이 있다. 반면, 클러스터 모드는 내부적으로 동작하며 대화형 쉘과 진행 로그를 실시간으로 볼 수 없지만 애플리케이션 마스터 프로세스가 복원될 수 있기에 탄력적이다. 보통 Spark 어플리케이션을 배포할 때는 클러스터 모드를 사용한다.

$ ./bin/spark-submit --master yarn --deploy-mode cluster myspark.py

로컬 모드에서는 실행 파일의 위치를 프로그램이 이해할 수 있으나, 클라이언트와 클러스터 모드에서는 드라이버 프로세스에서 실행되기 때문에 실행 파일의 경로가 읽히지 않는다.

3.8. Spark 클러스터 구성

  • 온프레미스 장비 + CDH
  • 클라우드 장비
    • Amazon EMR
    • Azure HDInsight

Spark 클러스터를 하드웨어로 초기 세팅을 어떻게 할 수 있을까? 클라우드 서비스는 이러한 클러스터 제품 서비스를 플랫폼으로 제공하기에 몇 번의 클릭만으로 클러스터를 구성할 수 있다. 온프레미스로 구축한다고 하면 하둡부터 하나하나 설치하기보다는 Cloudera의 CDH (Cloudera Distribution Hadoop)를 통해서 구축하는 것이 훨씬 쉽다.

3.9. Spark Configuration

Spark는 인메모리 중심의 연산을 하기 때문에 올바른 Config 설정이 중요하다. (1) 디폴트로 ‘/usr/lib/spark/conf/spark-defaults.conf’에서 설정할 수 있다. (2) Spark Job마다 다르게 설정하려면 spark-submit으로 job을 날릴 때 ‘–conf’ 옵션으로 설정할 수 있고, (3) Spark 프로그램 내부에서 SparkSession의 conf.set 함수로도 설정할 수 있다. Spark Configuration의 자세한 내용은 여기를 참고하길 바란다.

# [방법1] 

$ vi /usr/lib/spark/conf/spark-defaults.conf


# [방법2] 

$ ./bin/spark-submit ... --num-executors 15 --executor-cores 5 --executor-memory 10G myspark.py


# [방법3] # myspark.py

spark.conf.set("spark.executor.memory", '10g')
spark.conf.set("spark.executor.cores", '5')
...

Spark 튜닝하기 글에도 관련 내용이 있습니다.

3.10. 외부 라이브러리 사용

  • 모든 노드에 직접 설치 (ex. pip install)
  • (1) (콘솔에서 spark-submit 명령어) –py-files 인자
  • (2) (Spark 프로그램 내에서) sc.addPyFile 함수

외부 라이브러리를 Spark에서 사용하고 싶으면 어떻게 할까? 단순한 방법은 모든 노드에 해당 라이브러리를 설치하는 것이다. 다른 방법으로는 브로드캐스트를 통해서 모든 노드가 공유하는 방법이 있는데, addPyFile 함수 또는 –py-files 인자에 py 파일이나 zip 파일을 전달하면 된다. 이렇게 세팅한 후에 Spark 프로그램 내부에서 import를 해서 사용하면 된다.

# [방법1]

$ ./bin/spark-submit ... --py-files nlp_lib.zip myspark.py


# [방법2] # myspark.py

spark = SparkSession.builder.appName("GRIMTIND-NLP").getOrCreate()
sc = spark.sparkContext
sc.addPyFile(path_hdfs_or_s3_bucket + '/code/nlp_lib.zip')

# 참고: 브로드캐스트

from nlp_lib.ner import extract_name
name_recognizer_bd = sc.broadcast(extract_name)

(참고) 스파크 프로그램을 하나가 아닌 여러 개의 파이썬 스크립트 파일로 구성하고 싶을 때가 있다. 일반적으로 상대 경로로 import 해서 메인 스크립트(myspark.py)에서 사용할 수 있으나, 모든 노드가 사용하는 사용자 정의 함수를 사용할 때는 (1)과 (2) 방법과 함께 브로드캐스트(broadcast)를 해야 한다. 그렇지 않으면 파이썬 객체가 제대로 직렬화가 되지 않아 오류가 발생한다.

3.11. Spark와 함께 사용되는 플랫폼

  • Apache Airflow
  • Apache Livy
  • Kubernetes
  • Apache Cassandra
  • Hive

스케줄러를 위해 YARN 대신에 Kubernetes를 사용하고, 분산 파일 시스템으로 HDFS 대신에 Cassandra를 사용할 수 있다. 스파크 Job의 배포를 위해서 Airflow를 사용한다. 목적에 맞게 다양한 종류의 플랫폼을 선택할 수 있다. 하둡 에코시스템의 특징이기도 하다.


4. 마무리

Spark을 막상 시작하려고 하면 관련 프로그래밍 문법뿐만 아니라 분산 환경 인프라 등 다양한 부분을 이해해야 합니다. 이 글을 통해 Spark 프로그래밍 전에 대략적으로 고려해야되는 부분이 이정도 있다고 이해하면 좋을 것 같습니다.

(참고) Useful Links – “Apache Spark” 글에 Spark와 관련된 유용한 링크가 있습니다.


5. 각주

  1. 단순히 코어 개수와 메모리 크기를 계속 늘릴 수 있는 단일 서버가 있다고 하자. 이 경우는 하드웨어를 뜯어고칠 필요도 없고 저수준의 API 프로그래밍을 할 필요도 없다. 그러나 이러한 방식에는 문제점이 있다. 워크플로우 아키텍쳐를 따로 관리해야 된다. 모듈별 병목문제, 메모리 문제를 해결하며 최적화를 해야한다. 반면 Spark는 이러한 워크플로우를 내부적으로 알아서 최적화를 해준다.

6. 참고

  1. (논문) Spark: Cluster Computing with Working Sets
  2. (책) 스파크 완벽 가이드
  3. (책) 빅데이터를 지탱하는 기술
  4. (블로그) Running Spark Applications on YARN
  5. (포스트) Algaestudy – Hadoop Ecosystem
  6. (포스트) What is the Hadoop Framework
  7. (포스트) Spark vs Hadoop MapReduce
  8. (포스트) Comparing Apache Hive vs. Spark
  9. (포스트) File Format Benchmark – Avro, JSON, ORC, & Parquet