Spark는 대용량 데이터 처리를 분산 환경에서 효율적으로 처리할 수 있는 프레임워크입니다. Spark는 여러 층의 데이터 추상화 구조와 분산 아키텍쳐가 녹아있는 복잡한 프레임워크이기 때문에 Job이 단지 실행이 되는 것을 넘어 더 효율적으로 처리될 필요가 있습니다. Spark 튜닝을 잘하면 더 강건하고 효율적인 Spark Job을 만들 수 있습니다. Spark 시작하기 글을 먼저 보시면 도움이 됩니다.


1. 서론

1.1. Motivation

Spark 프로그래밍을 하다보면 문법에는 문제가 없지만 Spark Job의 실행 시간이 매우 느려지거나 오류로 종료되는 등 다양한 시행착오를 경험하게 된다. 대부분 분산 리소스를 관리하는 Spark와 YARN 환경 설정을 제대로 설정하지 못해 생기는 원인이다.

Spark Job을 작성하고 정상적으로 종료되고 결과물을 확인한 것으로 끝이 아니다. 이의 성능 최적화 과정이 필요하다. 복잡할 프로그램일수록 최적화로 인한 개선 폭이 크다. 또한, 분산 컴퓨터에서 돌아가는 작업은 보통 고비용이므로 필수적으로 확인할 필요가 있다.

대부분의 문제는 Spark Job의 실행 로그를 통해 확인할 수 있다. YARN 리소스 매니저에는 다양한 종류의 실행 로그가 쌓인다. 실행 시간이 느려지는 문제는 다양한 원인이 있지만 간단하게 확인하는 방법으로 기대한 Executor 개수만큼 Job이 잘 돌아갔는지 확인하는 방법이 있다.

프로그램이 종료되는 많은 원인 중 하나는 Out of Memory (OOM) 에러이다. 참고로 아래는 OOM 에러를 유발하는 다양한 로그의 신호들이다[2]. Exceeding Physical Memory가 특히 빈번하게 발생한다.

Java Heap Space

WARN TaskSetManager: Loss was due to 
java.lang.OutOfMemoryError 
java.lang.OutOfMemoryError: Java heap space


Exceeding Physical Memory

Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits. 12.4 GB of 12.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits. 4.5GB of 3GB physical memory used limits. Consider boosting spark.yarn.executor.memoryOverhead.


Exceeding Virtual Memory

Container killed by YARN for exceeding memory limits. 1.1gb of 1.0gb virtual memory used. Killing container.


Exceeding Executor Memory

Required executor memory (1024+384 MB) is above the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb

다음과 같이 Timeout 에러 혹은 디스크 에러와 관련된 Unhealthy 에러도 발생하기도 하는데, 이들도 역시 OOM 에러와 관련이 있다.

HeartbeatInterval Timeout

WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]


GC Allocation Failure (Exit status: 52)

2020-06-11T08:07:04.163+0000: [GC (Allocation Failure) 2020-06-11T08:07:04.163+0000: [ParNew: 136396K->2200K(147456K), 0.0069856 secs]
799275K->667990K(1140836K), 0.0070638 secs] [Times: user=0.02 sys=0.01, real=0.01 secs]
2020-06-11T08:07:04.319+0000: [GC (Allocation Failure) 2020-06-11T08:07:04.319+0000: [ParNew: 133272K->550K(147456K), 0.0034409 secs]
799062K->666354K(1140836K), 0.0035160 secs] [Times: user=0.01 sys=0.01, real=0.00 secs]
...


DIsk Space Issue

Yarn unhealthy Reason : 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad: /var/log/hadoop-yarn/containers
...
Yarn unhealthy Reason : 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad: /var/log/hadoop-yarn/containers
...

Spark 튜닝으로 성능 최적화와 함께 위와 같은 문제를 해결할 수 있다. Spark 튜닝은 제한된 리소스를 잘 배분하기 위한 Spark 환경설정과 효율적인 분산 데이터 처리를 위한 Spark 프로그래밍 기법으로 구성된다. 이들은 각각 하드웨어와 소프트웨어에서 제어를 하지만 서로 긴밀하게 연결되어 있다.

1.2. Spark 배경지식

Spark 튜닝을 알아보기 전에 Spark에 대한 기본적인 지식을 이해해보자. 특히, 추상화된 리소스의 단위를 이해하는 것이 중요하다.

Spark Job 실행 흐름 (Executor 중심)

  • Spark와 YARN은 효율적인 클러스터 리소스 관리를 위해 여러 개의 논리적인 단위로 Spark Job을 구성함
  • 구성 단위는 Application, Job, Task, Stage, Worker, Executor 등 다양함
  • Executor는 실질적으로 연산을 하는 프로세스이며 이를 기준으로 메모리 계층을 구성하고 병렬성 크기를 결정함
  • Spark는 물리적으로 보통 1개의 Master와 1개 이상의 Worker 노드로 구성됨
  • 1개의 Worker 노드는 1개 이상의 Executor 프로세스를 동작함
  • Spark Job의 실행을 위해 보통 1개 이상의 Executor 프로세스가 실행됨
  • Spark Job은 DAG 스케줄러에 의해 1개 이상의 Task로 쪼개짐
  • 1개의 Executor는 1개 이상의 Task를 순차적으로 혹은 병렬적으로 실행함
  • 1개의 Executor는 하나의 JVM(Java Virtual Machine)을 가짐
  • 각각의 Executor는 똑같은 개수의 Core와 똑같은 크기의 Memory (Heap)를 가짐
  • 분산 능력이 없다면 Spark Job은 오직 하나의 Executor에서 실행됨
  • 병렬성이 아예 없다면 하나의 Executor와 하나의 Core에서 실행됨

Executor 메모리 계층 구성

Spark와 YARN은 다음과 같이 Executor 컨테이너 단위로 메모리 계층을 구성한다[2]. 다양한 파라미터 중에 오직 spark.executor.memory 한 개만 실제로 Task를 실행할 때 사용된다. Spark이 요청받은 리소스 정보는 YARN에게도 전달된다. Spark 파라미터와 함께 YARN 파라미터도 있다.

  • yarn.nodemanager.resource.memory-md: 1개의 노드에 있는 모든 Executor 컨테이너들이 사용하는 메모리 총합
  • spark.yarn.executor.MemoryOverhead: 오버헤드를 위한 여유분 메모리 크기
  • spark.executor.memory: 1개의 Executor가 사용하는 메모리 크기
  • spark.memoryFraction: Task 실행, 셔플, 조인, 정렬, 집계를 위한 데이터 저장 비율
  • spark.storage.memoryFraction: Cache, Broadcast, Accumulator를 위한 데이터 저장 비율

RDD (Resilient Distributed Datasets)

Spark 아키텍쳐는 크게 RDD와 DAG로 2가지의 주요한 추상화(abstraction)을 가진다. RDD는 특히 Spark 프로그래밍 튜닝 기법에서 많이 다룬다.

  • ”Spark = RDD + Interface” 공식처럼 RDD는 분산 환경과 Spark에 맞는 특별한 자료구조임
  • RDD는 Spark 고유의 자료구조이며 분산이 용이하게 여러 개의 파티션으로 구성됨
  • Read-only와 Immutable의 특징으로 Fault-tolerance를 쉽게 극복함
  • Spark Job은 RDD를 가공해 새로운 RDD를 얻는 식의 반복으로 구성됨
  • RDD는 변환(transformation)과 액션(action)의 두 가지 종류의 operator가 있음
  • Lazy-execution으로 Operation 파이프라인을 최적화시킴
  • 변환 함수는 Narrow와 Wide로 구분되며 Wide는 네트워크를 통해 데이터 셔플이 발생하는 아주 비싼 operator임

2. Spark 환경설정

Spark는 분산 환경에서 동작하고 메모리를 주로 활용하기에 그만큼 리소스 할당이 중요하다. 데이터 크기, Spark API 함수 종류, 그리고 YARN 스케줄링 방식에 따라 적합한 리소스 할당 방법이 필요하다. Spark는 Spark property, Environment 변수, Logging이라는 세 가지 종류의 환경 변수가 있다. 이 글에서는 Spark property만 다룬다1(이 글에선 편의상 Spark property를 Spark 파라미터라고 명명). 또한, Spark는 YARN과 함께 동작하기에 YARN 환경 변수도 함께 고려한다.

2.1. 설정 방법

Spark 파라미터는 다음과 같이 명령어의 인자, API 함수, conf 파일을 수정하면서 설정할 수 있다.

# (1) bin/spark-submit 명령어

$ ./bin/spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 20g \
    --executor-memory 20g \
    --executor-cores 4
# (2) SparkConf API

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()
spark.conf.set("spark.master", "yarn")
spark.conf.set("spark.submit.deployMode", "cluster")
spark.conf.set("spark.driver.memory", "20g")
spark.conf.set("spark.executor.memory", "20g")
spark.conf.set("spark.executor.cores", "4")
sc = spark.sparkContext
# (3) spark-defaults.conf 파일

$ vi $SPARK_HOME/conf/spark-defaults.conf

Spark 파라미터의 종류는 많고 다양하다. 한 번 세팅하면 잘 바뀌지 않는 정적인 파라미터의 경우는 (3) 방법으로 세팅한다. 반대로 Spark Job마다 자주 바뀌는 동적인 파라미터는 (1) 또는 (2) 방법으로 설정한다.

(2) SparkConf API 방법을 사용할 때 주의할 점이 있다. Deploy 모드가 ‘client’일 경우에는 `spark.driver.memory`, `spark.driver.extraClassPath` 등 driver와 관련된 몇 가지 파라미터는 SparkConf API 방법으로는 세팅할 수 없다.

Spark은 분산 데이터 처리기로 리소스 관리자인 YARN과 같이 실행된다. 이 때문에 YARN 환경 설정도 잘 세팅해야 한다. ‘yarn-site.xml’ 파일로 설정한다.

$ $HADOOP_HOME/conf/yarn-site.xml

2.2. 파라미터 튜닝 방법

성능 최적화를 위해 Spark 파라미터와 YARN이라는 두 가지 종류의 환경 변수가 있다. 이들의 값을 어떤 기준으로 설정할 수 있을까?

YARN 파라미터

  • yarn.nodemanager.resource.memory-md: 1개의 노드에 있는 모든 Executor 컨테이너들이 사용하는 메모리 총합
  • yarn.nodemanager.resource.cpu-vcores: 1개의 노드에 있는 모든 Executor 컨테이너들이 사용하는 코어 수 총합

노드 하나가 가지는 메모리 크기와 코어 개수를 참고하여 위 파라미터의 값을 할당한다. 각 노드에 OS와 하둡 데몬을 돌릴 리소스가 필요하기에 물리적인 크기와 똑같은 리소스 할당은 피해야 한다. 예를 들어, 노드 하나가 16개 코어를 가지고 64GB 메모리를 가진다면 다음과 같이 설정한다.

yarn.nodemanager.resource.memory-md = (64-1) x 1024 = 64512
yarn.nodemanager.resource.cpu-vcores = (16-1) = 15

다음과 같은 Upper Limit 조건을 항상 만족해야 한다. 그렇지 않으면 OOM 에러가 발생한다.

yarn.nodemanger.resource.memory-md > spark.yarn.executor.MemoryOverhead + spark.executor.memory

YARN 파라미터는 정적인 옵션이다. 따라서 클러스터를 초기에 구성했을 때와 하드웨어 스펙을 변경했을 때 한 번씩만 체크해주면 된다.

  • deploy-mode

Spark Job을 제출할 때 배포 모드(Deploy Mode)를 선택한다. 대표적으로 Client와 Cluster 모드가 있다. Client 모드는 Driver가 Client 프로세스에서 돌기에 로그를 사용자가 실시간으로 확인할 수 있어 개발할 때 주로 사용된다. 단, Driver 메모리 사용에 유의해야 한다. Cluster는 Driver가 YARN에 관리되는 어플리케이션 마스터 프로세스 내에서 실행되므로 효율적인 자원 관리가 가능하여 서비스에 배포할 때 주로 사용된다. 상황에 맞게 사용하면 된다.

Spark 파라미터

더 많은 종류가 있지만 여기서는 다음과 같이 주요한 파라미터만 고려한다.

  • spark.executor.memory: 하나의 executor에서 Task를 실행하는 데 사용하는 메모리 크기
  • spark.executor.cores: 하나의 executor에서 Task를 실행하는 데 사용하는 (가상) 코어 개수
  • spark.driver.memory: 하나의 driver에서 Task를 실행하는 데 사용하는 메모리 크기
  • spark.driver.cores: 하나의 driver에서 Task를 실행하는 데 사용하는 (가상) 코어 개수
  • spark.executor.instances: Job 실행을 위한 executor 개수 설정
  • spark.default.parallelism: join, reduceByKey와 같은 Wide 변환 후에, 그리고 파티션 개수를 지정하지 않은 경우에 대한 RDD의 디폴트 파티션 개수

전체 노드의 메모리, 코어 개수 등의 하드웨어 리소스는 항상 고정되어 있다. 이 전제 때문에 다음 파라미터들은 서로 비례 혹은 반비례 관계를 가진다. Executor 개수를 늘리면 코어 개수와 메모리 크기를 줄일 수 밖에 없다. CPU와 메모리는 Spark과 YARN의 메인 리소스이다.

분산 클러스터에 6개의 Worker 노드가 있고 각 노드는 16개 core, 64GM 메모리를 가진다고 하자. 위의 관계를 이용하여 다음과 같은 양극단의 케이스를 도출할 수 있다.

6number of executor-instances17
15number of executor-cores5
63Gsize of executor-memory19G

각 파라미터 값의 증가와 감소에 따라서 다음과 같은 특징이 있다.

  • 하나의 Executor에서 15개로 최대 개수의 코어를 부여하면 하나의 JVM에서 너무 많은 일을 하기 때문에 HDFS의 I/O 성능이 떨어진다. Executor 당 5개의 코어를 넘지 않는 것이 좋다.
  • 하나의 Executor에 매우 적은 코어 개수를 부여하면 하나의 JVM 자원을 여러 개의 Task를 돌리는 이점을 가지지 못한다.
  • 하나의 Executor에 많은 메모리 사이즈를 부여하면 Garbage Collection 딜레이가 발생한다.
  • 큰 크기의 메모리를 할당할 때 YARN의 Upper Limit 조건을 유의해야 한다.
  • Executor 개수만큼 브로드캐스트 데이터 복제가 일어난다.
  • Executor 개수가 많을수록 많은 양의 HDFS I/O 연산을 할 수 있다. 반대로 줄일수록 병렬성 크기는 줄어든다.

일반적으로 변환 함수 위주로 Spark Job이 구성되기 때문에 오른쪽(17,5,19G) 파라미터 조합이 좋다. 그런데 데이터 셔플이 많이 요구되는 함수 위주로 구성된다면 왼쪽(6,15,63G) 조합이 좋을 때도 있다. 위 자료는 참고만 하고 무엇보다도 하드웨어 리소스, 데이터 특성, 자신의 operator 등에 맞게 조합을 잘 선택해야 한다. AWS 블로그[2]에서는 어떤 사이즈의 클러스터에도 잘 동작하는 Spark 파라미터의 조합을 경험에 근거한 계산식으로 제공하기도 한다.


3. Spark 프로그래밍

3.1. 파티셔닝

분산 데이터 처리는 큰 데이터를 작은 데이터들로 분할해서 각개격파하는 Divide and Conquer 철학을 가진다. Spark에서는 작은 데이터를 지칭하는 논리적인 데이터 청크 단위인 파티션(partition)이 있다. 하나의 파티션에서 하나의 테스크가 실행된다. 이론적으로는 하나의 코어가 한 개의 파티션을 점유하지만 실제론 2~3배의 파티션을 가져도 된다.

파티션 개수를 제어하는 파티셔닝 기법은 클러스터 성능에 큰 영향을 준다. 각 파티션이 일정 크기의 데이터(128MB ~ 1GB)를 가지는 것에 한해서 파티션 개수를 최대화하여 병렬성을 높이는 것이 좋다. 파티션이 매우 적은 크기의 데이터를 가진다면 작업 능률은 떨어진다. 파티셔닝은 개수를 제어하는 것뿐만 아니라 파티션의 데이터 분포를 균등하게 조절하는 역할도 한다. 데이터가 한 쪽 파티션에 쏠리게 되면 추측 실행(speculative execution)이 증가해 성능 저하를 유발하고 메모리 에러로 프로그램이 종료되기도 한다.

데이터를 로딩할 때 Spark는 데이터 크기에 맞게 자동으로 적절한 파티션 개수를 설정한다. 이후에 Narrow 변환 함수만 실행된다면 파티션 개수의 변함은 없다. Join, GroupBy와 같이 데이터 셔플을 유발하는 Wide 변환 함수가 실행되면 디폴트 200개로 재설정된다2. 데이터 크기가 적당하고 심플한 API 함수로만 구성된다면 파티셔닝 기법은 크게 중요하지 않다. 하지만, 데이터가 크고 복잡한 API 함수 파이프라인으로 구성된다면 적재적소에 파티셔닝을 해줘야 한다.

파티셔닝해야 하는 한 가지 예를 살펴보자[4]. Filter 변환 함수를 실행하면 보통 데이터 분포가 균등하지 않고 편향(skewed)된다.

이러한 경우 다음과 같은 파티셔닝 방법을 사용하여 고른 데이터 분포를 가지도록 한다. 주의할 점은 파티션 함수가 고비용이므로 적합한 상황에서만 사용해야 한다.

  • repartition(): 파티션 개수를 줄이거나 늘릴 때 사용
  • coalesce(): 파티션 개수를 줄일 때 사용 (데이터 셔플을 하지 않는 특별한 repartition 함수와 같다.
  • partitionBy(): Disk 파티셔닝.

최적의 파티션 개수는 여러 번의 실험을 통해 확인해야 한다. 기본적으로 클러스터 코어 수의 2배, 즉 모든 작업자 노드의 총 코어 수의 2배로 설정하는 것이지만, 데이터 종류와 크기, Operation 함수에 따라 크게 달라진다.

리파티션 계열의 함수는 해쉬를 기반으로 데이터를 분배한다. 데이터 샘플이 많을수록 비교적 정확히 균등하게 분배되지만 샘플이 작을수록 간혹 잘 못하는 경우도 있다. 이 경우는 기준이 되는 새로운 컬럼을 인위적으로 생성해 정확히 균등하게 분배하도록 한다. 이를 Salting 기법이라 한다.

3.2. 캐싱

RDD는 동일한 프로그램에서 액션 함수가 호출될 때마다 데이터 로드부터 처음부터 다시 계산된다. RDD를 캐싱하면 이후에 실행되는 액션 함수는 재평가없이 재사용할 수 있다. 같은 RDD 또는 Dataframe에 반복되는 액션 함수를 실행시킬려면 캐싱이 유용하다. RDD는 persist, DataFrame은 cache 함수로 캐싱할 수 있다. RDD로 접근하는 persist는 스토리지 레벨에 따라 캐싱할 수 있는 장점이 있다.

  • RDD.persist()
  • DataFrame.cache()

한 가지 예로, 다른 조건의 filter 함수를 여러 번 실행할 때 캐싱을 한다.

df = df.cache()
noun_df = df.filter(F.col('pos_type') == 'NOUN')
verb_df  = df.filter(F.col('pos_type') == 'VERB')
...
df.unpersist()

3.3. 브로드캐스팅

브로드캐스트(broadcast) 변수는 모든 작업 노드들이 접근할 수 있는 공유변수이다. Driver 프로세스에 있는 데이터 혹은 함수를 클러스터 전역에서 사용하는 기능이 있지만 상황에 맞게 이러한 공유 변수를 잘 활용한다면 성능 이점도 살릴 수 있다. 한 가지 예를 들어보자. 작은 DataFrame과 큰 DataFrame을 조인하려고 한다. 그냥 하면 비싼 셔플 연산이 필요하다. 작은 DataFrame을 브로드캐스팅한다면 셔플 연산이 필요없이 조인을 할 수 있다. 특히 큰 DataFrame과 조인할 때 매우 효율적이다.

from pyspark.sql.functions import broadcast

big_df.join(broadcast(small_df), big_df.id == small_df.id)

3.4. 기타

Spark의 힘은 데이터를 여러 노드에 분산해서 처리하는 것이다. RDD.collect() 메소드를 호출하면 분산되어 있던 모든 데이터를 마스터 노드(driver 프로세스)로 보내진다. 이 경우 메모리 에러가 발생할 확률이 높고 분산의 힘을 전혀 사용하지 못한다. 만약 꼭 사용해야 한다면 ‘take’ 또는 ‘takeSample’ 메소드를 사용하거나 필터링하여 데이터의 일부만 가져오는 것이 좋다.


4. 그 외 기법

위의 Spark 환경설정과 Spark 프로그래밍의 범주에는 포함되지 않지만 스파크 프로그램을 효율적으로 실행하기 위해서 추가적으로 고려해야 되는 사항이 있다.

대용량 데이터를 다룰수록 분산 파일 시스템의 디렉토리 구성을 잘 구분해야 한다. 매번 전체 데이터를 로딩해서 분석에 필요하지 않는 데이터도 읽을 필요가 없다. 큰 연산 비용이 요구된다. 데이터 처리 단위에 따라 폴더 구성을 세밀하게 해야 한다. 이는 특히 데이터가 매우 많은 로그 데이터를 다룰 때 중요하다.


5. 마무리

분산 처리의 힘을 제대로 발휘하려면 올바른 환경설정과 프로그래밍 기법으로 상황에 맞게 튜닝을 잘 해야 합니다. 성능최적화를 하는 것과 더불어 튜닝을 제대로 하지 않으면 오류가 발생하여 프로그램이 종료되기 쉽습니다. 잘 알지 못한 상태로 로그를 이해하기 어렵습니다. 위의 튜닝 기법들을 이해하면 로그 분석이 좀 더 쉬워질 수 있습니다. Spark 튜닝하는 방법은 정답이 없습니다. 데이터 사이즈, 클러스터 스펙, 스파크 메소드에 등에 따라 달라집니다. 문제에 맞게 어떤 부분을 중점을 줘야하는지 이해할 필요가 있습니다.

Useful Links – “Apache Spark” 글에 Spark와 관련된 유용한 링크를 주제/사이트별로 정리해놓았습니다.


6. 각주

  1. Spark property는 Application Property, Runtime Environment, Dynamic Allocation 등 다양한 타입의 property가 있다. 자세한 내용은 [1]을 참고하길 바란다.
  2. 이 개수는 Dataframe API의 경우 `spark.sql.shuffle.partitions`로, RDD API는 `spark.default.parallelism`로 재설정할 수 있다. 해당 파라미터가 디폴트로 200이 할당되어있다.

7. 참조

  1. (공식) Spark Configuration
  2. (블로그) Best practices for successfully managing memory for Apache Spark applications on Amazon EMR
  3. (블로그) How-to: Tune Your Apache Spark Jobs (Part 2)
  4. (책) Data Analytics with Spark Using Python
  5. (블로그) Oh My God!! Is my Data Skewed ?
  6. (블로그) Apache Spark: Caching
  7. Apache Spark: The number of cores vs. the number of executors
  8. Partitioning on Disk with partitionBy