Skip to content

[spark] word count example, related articles

Myungchul Shin edited this page Dec 31, 2016 · 36 revisions

num-executors, executor-memory, executor-cores

예를 들어, 

WORKER NODES : 68
Alive Workers: 60
cores per node : 24 (2 physical processor, 6 cores per processor, hyper-threading)
available total cores : 1440
memory per node : 32G
SPARK_WORKER_INSTANCES=1
SPARK_EXECUTOR_INSTANCES = 2
SPARK_EXECUTOR_CORES = 1
SPARK_EXECUTOR_MEMORY = 1G

SPARK_WORKER_CORES=16
이 설정은 각각의 node에서 사용 가능한 최대 core의 수를 의미한다. 
따라서, '60 * 16 = 960 cores'
spark master web ui를 보면 아래와 같이 보일 것이다. 
Cores in use: 960 Total, 550 Used

위와 같은 설정에서는 60대의 worker node 각각에 대해서 1개의 worker가 실행되고
worker는 2개의 executor를 실행시킨다. 또한, 개별 executor에서 사용 가능한 core는 1개.
(executor에서 여러개의 task가 돌지만 1개의 core만 사용한다는 의미)
따라서, 총 60개의 worker, 120개의 executor가 초기에 실행되고 
executor가 사용하는 core의 총 수는 120이라는 의미이다.  


그런데 만약, 아래와 같이 실행시킨다면, 

spark-submit --master yarn-client --total-executor-cores=100 --num-executors=20 --executor-cores=5 --executor-memory=2G

submit되는 현재 job을 위해서 cluster에 있는 executor들을 사용하는데 최대 100개의 core를 요청한다(사용한다). 
'num-executors''SPARK_EXECUTOR_INSTANCES'에 대응하는 것으로 이 옵션을 제외하면 동적으로 필요한 수만큼 계산된다. 
(즉, 명시적으로 --num-executors 옵션을 부여할 필요가 없다는 뜻이다)
여기서는 값을 20으로 줬기 때문에 20개의 executor가 실행된다.

각각의 executor에서 최대 5개의 core를 사용할 수 있다는 것은
executor에서 multi-thread로 실행되는 task들이 5개의 core를 활용 할 수 있다는 의미이다. 

그렇다면 아래와 같이 실행하는 경우는? 

spark-submit --master yarn-client --total-executor-cores=50 --executor-cores=5 --executor-memory=2G

이때는 (total-executor-cores/executor-cores) = (50/5) = 10개의 executor가 실행되며,
각각의 executor는 5개의 core와 2G memory를 점유한다. 

* 위에서 executor가 '실행된다'라고 기술했는데, 사실 없어서 새롭게 실행될 수도 있고 기존에 있는 executor(id로 구별)를
  할당 받아서 task를 실행할 수도 있다. 그런 의미에서 executor는 task 실행을 위한 box 정도이고 실제로
  core, memory는 해당하는 task들에 할당된다고 볼 수 있다. 
* yarn-client와 yarn-cluster는 둘 다 yarn을 사용하는데, client의 경우는 driver가 코드를 실행시키는 서버에 위치한다.
  yarn-client로 실행시키면 제어하거나 모니터링하기 좀더 용이한 측면이 있다.

sample snippet to read, process, and save json files

# -*- coding: utf-8 -*-
import sys
import datetime
from dateutil.relativedelta import relativedelta

from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
...
sc     = SparkContext(pyFiles=['lib.py']) # with other python libraries
sqlCtx = SQLContext(sc)
...

# input data format : JSON
schema_data = StructType([
              StructField('x',      StringType(), False),
              StructField('y',      StringType(), False),
              StructField('z',      StringType(), False),
         ])

def func(x, resource_for_func) :
   # .....
   return x

p  = sqlCtx.jsonFile('/path/to/hdfs', schema_data)
# p  = sc.textFile('/path/to/hdfs')
# if input data format is TEXT

# check valid path if needed
try : p.first()
except Exception, e :
  sys.stderr.write(str(e) +'\n');
  sys.exit(1)

resource_for_func = initialize('/resource')

pp = p.map(lambda x : x.asDict()) \
      .map(lambda x : (x, 1)) \
      .reduceByKey(lambda x1, x2 : x1 + x2, numPartitions = 1024) \
      .flatMap(lambda x : func(x, resource_for_func)) \
      .filter(lambda x : exp)

pp.toDF().save('/path/to/hdfs', 'json')
...

using external c extension

tri 사전을 빌드한다.
$ python mktrie.py -p /local/topn.txt -b /local/topn.tri

name=filtering
executors=32
memory=5G
prog="filtering.py --topn=./topn.tri --out=/output/path"
spark-submit --master yarn-client --name ${name} --num-executors ${executors} --executor-memory ${memory} --files libtrie.so --archives "/local/topn.tri#topn.tri" ${prog}

이것은 mmap이 지원되는 trie를(지원 안돼도 상관없지만 메모리를 적게 사용하려면 필요) 빌드해두고,  
filtering.py에서 데이터를 읽어서 topn.tri에 해당하는 것만 필터링하는 코드의 일부분이다. 
여기서 사용되는 trie의 c extension library는 --files 옵션을 이용해서 executor의 working directory로 전송한다.
마찬가지로 topn.tri도 --archives 옵션을 이용해서 working directory로 전송한다.
(--files와 --archives는 기본적으로 같은 옵션이다) 
'/local/topn.tri#topn.tri'에서 '#' 뒤쪽은 해당 파일이 executor의 working directory에 전송된 이후, 
참조되는 이름이다.  

topn.tri를 slave node의 동일 경로에 미리 전송시켜두고, '--topn=/remote/topn.tri'로 참조하게 하는 방법도 있다.
이렇게 하는 것과 '--archives'를 사용하는 방법 중에 어느 방법이 더 빠를지는 계산해봐야한다. 별 차이는 없어야한다.

filtering.py 내부에서 trie를 참조하는 코드는 아래와 같다.

# global
__topn_data = None

먼저 전역 변수를 하나 선언. topn.tri를 load해서 이 변수에 저장할 것이다. 

def init_topn(topn_path) :
    '''
    한번 로딩된 데이터를 다시 로딩하지 않도록 한다.
    '''
    global __topn_data
    if __topn_data : return __topn_data
    sys.path.append('.')
    import libtrie as trie
    topn_data = trie()
    topn_data.load(topn_path)
    __topn_data = topn_data
    return topn_data

def check_topn(topn_path, string) :
    topn_data = init_topn(topn_path)
    string_key = string.replace(' ','').lower()
    key = string_key.encode('utf-8')
    try : ret = topn_data.find(key)
    except : return None
    return ret

p = rdd_union.map(lambda x : x.split('\t')) \
      .filter(lambda x : (x[7].strip() and len(x[7].strip()) <= 96)) \
      .filter(lambda x : check_topn(topn_path, x[7])) \
      ...

rdd에 대해서 check_topn()으로 검사해서 필터링하는 코드인데, check_topn() 내부에서 trie를 load하는 것이 조금 다르다. 
외부에서 load한 다음 그것을 check_topn()의 param으로 주는 방식을 생각할 수 있는데, 그런 방법은 동작하지 않는다. 
모든 것은 결국 check_topn() 내부에서 해결해야한다. 

그런데, 모든 검사마다 trie를 load하는 것은 매우 비효율적이므로 한번 로드한 것을 재활용하게 하는 코드가 init_topn()이다.  

multiple groupby

예를 들어서, 웹사이트 로그를 분석한다고 하자. 
하루에 쿼리가 1000 이상 들어오고 쿼리 하나당 노출되는 컬렉션의 코드(뉴스, 동영상, 이미지, ...) 찍힌다. 이런 상황에서
특정 쿼리에 대해 노출되는 컬렉션 코드의 수를 병합해서  컬렉션별로 비율을 계산하려고 한다. 
이런 종류의 작업은 보통 groupby를 사용하게 되는데, (key,value list)에서 value list가 어마어마하게 길어진다. 
따라서,  기간에 대해 groupby를 하면 대부분 out-of-memory 문제가 발생하게 된다. 
이를 해결하는 방식은 대략 아래와 같다. 

 p  = rdd.map(lambda x : x.split('\t')) \
          .map(lambda x : func1(x)) \
          .filter(lambda x : x) \
          .groupByKey(numPartitions=1024) \
          .map(lambda x : func2_1(x)) \
          .groupByKey(numPartitions=1024) \
          .map(lambda x : func2_2(x)) \

, query 자체에 대해서 groupby하는 것이 아니라, 
- (query,ymd) 등으로 하루씩 묶어서 groupby를 먼저하고
-  개별 결과물에 대해서 부분적으로 통합한다.(func2_1)
- 그리고나서, (query,ymd) 통합된 결과물을 다시 query로 groupby
- func2_2를 이용해서 최종처리한다.

이렇게 groupby를 여러번 쪼개서 하는 기법은 pig에서 mapper에서 
reducer의 역할을 먼저 수행하게 하는 트릭과 비슷하다. 
(참고 https://github.com/dsindex/blog/wiki/%5Bpig%5D-map-reduce-for-unbalanced-key-distribution )

사실 groupby가 아니라 reduceby로 처리할  있으면 이처럼 복잡한 과정을 거치지 않아도 된다.
하지만, reduceby로는 처리하기 어려운 복잡도를 갖는 경우라면 유용할  같다.
Clone this wiki locally