📎 Spark User Guide, Spark The Definitive Guide
4. MLlib
4.1. MLlib
spark MLlib vs ScikitLearn
- spark DF 기반으로 ML 구동
- feature vectorization: 여러개 columns을 하나로
- estimator가 fit을 호출할 때 model 객체를 반환하고 여기서 예측 수행
- pipeline 사용 권장
4.2. MLlib 구성요소
Transformer
: StringIndexer, OneHotEncoder, VectorAssebler, StandardScaler, Tokenizer
- 원론적으로는 하나의 DataFrame을 다른 DataFrame으로 변환하는 기능을 가진 클래스들을 총칭
-
Transformer는 transform() 메소드로 변환 수행. transform()의 입력 인자로 DataFrame을 입력하고 변환된 DataFrame을 반환
-
보통 레이블/원-핫 인코딩을 하는 인코더, 스케일링 변환을 하는 스케일러, Feature Vectorization 등의 작업 수행
- 특이하게도, Estimator가 fit() 하여 생성된 ML 모델도 Transformer 역할을 수행. 즉 모델이 예측하려면 transform() 메소드를 호출하는데 여기에 입력 인자로 DataFrame이 필요하며, 반환된 DataFrame에 예측 결과를 가지고 있음
Estimator
: Classification, Regression, Clustering, Collaborative Filtering
from pyspark.ml.classification import DecisionTreeClassifier
dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='Survived')
dt_model = dt_estimator.fit(train_sdf) # train 데이터로 학습 후 테스트 데이터로 예측
predictions = dt_model.transform(test_sdf) # test 데이터로 적합 후 해당 결과로 정확도 평가
Evaluator
: RegressionEvaluator, BinaryClassificationEvaluator, MultiClassificationEvaluator
- BinaryClassificationEvaluator는 roc-auc 지표만 제공
- 정확도(accuracy)를 얻기 위해서는 이진 분류 여부와 관계없이 MulticlassClassificationEvaluator 클래스를 사용해야 함
- precision, recall, f1-score를 얻기 위해서는 이진 분류 여부와 관계없이 MulticlassClassificationEvaluator 클래스를 사용해야 함
- MulticlassClassificationEvaluator로 얻어지는 precision, recall은 positive/negative 예측 데이터 건수를 반영한 weighted precision, weighted recall 값임
- 편리함을 위해서라면 예측 결과 Spark DataFrame을 pandas DataFrame으로 변경하고 사이킷런 평가 API를 사용하는 것이 더 나은 선택일 수 있음
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='accuracy')
print(accuracy_evaluator.evaluate(predictions)) # 위 Estimator로 추출한 결과 predictions 넣어서 정확도 확인
Pipeline
- 여러개의 개별적인 Transformer의 변환 작업, Estimator의 학습 작업을 일련의 Stage 연결을 통해 간단한 API 처리로 구현할 수 있게 만들어 줌
- Pipeline은 개별 변환 및 학습 작업을 Stage로 각각 정의하여 Pipeline에 등록한 뒤 Pipeline의 fit() 메소드를 호출하여 연결된 Stage 작업을 수행하여 학습 진행, 학습 결과로 PipelineModel이 반환되며 이 PipelineModel에서 예측을 위한 변환 및 예측 작 업을 transform() 메소드로 수행
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
def encode_columns(sdf, input_cols=None, encode_gubun='label'):
# label encoding과 one hot encoding 변환 컬럼명 지정.
label_cols = ['label_'+col for col in input_cols]
onehot_cols = ['onehot_'+col for col in input_cols]
#pipeline의 stages로 지정된 StringIndexer와 OneHotEncoder 객체 생성.
label_encoder_stage = StringIndexer(inputCols=input_cols, outputCols=label_cols)
onehot_encoder_stage = OneHotEncoder(inputCols=label_cols, outputCols=onehot_cols)
# encode_gubun이 label이면 StringIndexer stage만 등록, onehot이면 StringIndexer, OneHotEncoder 모두 등록.
stages = []
if encode_gubun == 'label':
stages = [label_encoder_stage]
else:
stages = [label_encoder_stage, onehot_encoder_stage]
pipeline = Pipeline(stages=stages) # 위에서 만들어진 객체로 파이프라인 생성
sdf = pipeline.fit(sdf).transform(sdf) # 파이프라인 학습 후 적합
return sdf
Model Selection( and Tuning)
: randomSplit, TrainValidationSplit, CrossValidator, ParamGridBuilder
- Spark ML은 직접적으로 K-Fold 기반으로 데이터를 선택하게 만드는 사이킷런의 KFold 클래스나 교차 검증 성능 결과만 반환해주 는 cross_val_score()는 지원하지 않음 (아래 sklearn 예제)
kfold = KFold(n_splits=5)
for train_index, test_index in kfold.split(iris_df):
X_train, X_test = iris_df.iloc[train_index], iris_df.iloc[test_index]
y_train, y_test = label_df.iloc[train_index], label_df.iloc[test_index]
scores = cross_val_score(dt_clf , data , label , scoring='accuracy',cv=3)
- Spark ML 은 CrossValidator 클래스를 통해서 교차 검증 지원 하지만 CrossValidator는 교차 검증만 수행하지 않고 하이퍼 파라미 터 튜닝까지 같이 수행하므로 사이킷런의 GridSearchCV와 유사 (아래 sklearn 예제)
grid_dtree = GridSearchCV(dtree, param_grid=parameters, cv=3, refit=True, return_train_score=True)
grid_dtree.fit(X_train, y_train)
scores_df = pd.DataFrame(grid_dtree.cv_results_)
scores_df[['params', 'mean_test_score', 'rank_test_score',
'split0_test_score', 'split1_test_score', 'split2_test_score']]
Spark ML 사용
cv = CrossValidator(estimator=dt,
estimatorParamMaps=param_grid, evaluator=evaluator_accuracy,
numFolds=3)
tvs = TrainValidationSplit(estimator=dt,
estimatorParamMaps=param_grid,
evaluator=evaluator_accuracy, trainRatio=0.75)
4.3. Feature Vectorization
Label encoding: StringIndexer 사용
문자형 -> 숫자로 변경
변환될 컬럼명(category)과 변환 후 컬럼명(category_index)을 입력
from pyspark.ml.feature import StringIndexer
df
> +---+--------+
| id|category|
+---+--------+
| 0| a|
| 1| b|
| 2| c|
| 3| a|
| 4| a|
| 5| c|
+---+--------+
indexer = StringIndexer(inputCol='category', outputCol='category_index')
# 여러 컬럼도 가능
# StringIndexer(inputCols=["category1", "category2"], outputCols=["label_encoded1", "label_encoded2"])
indexer_model = indexer.fit(df) # 변환될 df를 넣고 학습시킴
print(indexer_model) # 모델이 만들어짐
> StringIndexerModel: uid=StringIndexer_0ec8942b1277, handleInvalid=error
indexed_df = indexer_model.transform(df) # 만들어진 모델에 df를 넣고 transform하면 변환 후 df가 반환 됨
indexed_df.show()
> +---+--------+--------------+
| id|category|category_index|
+---+--------+--------------+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 1.0|
| 3| a| 0.0|
| 4| a| 0.0|
| 5| c| 1.0|
+---+--------+--------------+
숫자형 -> 문자형
원복 시키기
from pyspark.ml.feature import IndexToString
converter = IndexToString(inputCol='category_index', outputCol='original_category')
converted = converter.transform(indexed_df)
converted.show()
> +---+--------+--------------+-----------------+
| id|category|category_index|original_category|
+---+--------+--------------+-----------------+
| 0| a| 0.0| a|
| 1| b| 2.0| b|
| 2| c| 1.0| c|
| 3| a| 0.0| a|
| 4| a| 0.0| a|
| 5| c| 1.0| c|
+---+--------+--------------+-----------------+
One-hot encoding: OneHotEncoder 사용
크기를 벡터의 차원으로 하고, 표현하고 싶은 단어의 인덱스에 1의 값을 부여하고, 다른 인덱스에는 0을 부여하는 벡터 표현 방식
ex) 5개의 카테고리(0, 1, 2, 3, 4)가 있을 경우
0은 [0.1, 0.0, 0.0, 0.0]
1은 [0.0, 1.0, 0.0, 0.0]
2는 [0.0, 0.0, 1.0, 0.0]
3은 [0.0, 0.0, 0.0, 1.0]
4는 [0.0, 0.0, 0.0, 0.0]
OneHotEncoder될 컬럼은 반드시 숫자형으로 변환되어 있어야 함 따라서 OneHotEncoder를 String 컬럼에 적용 시에는 Label Encoding을 먼저 적용 후에 변환해야 함
dropLast는 마지막 인자를 제외할지를 나타냄 (default=True)
위 예시에서 마지막 카테고리인 4 [0.0, 0.0, 0.0, 0.0] 가 제외됨
OneHotEncoder는 sparse vector 형태로 onehot encoding 적용
from pyspark.ml.feature import OneHotEncoder
df
> +---+--------------------------+
| categoryIndex1|categoryIndex2|
+---+--------------------------+
| 0.0| 1.0|
| 1.0| 0.0|
| 2.0| 1.0|
| 0.0| 2.0|
| 0.0| 1.0|
| 2.0| 0.0|
+---+--------------------------+
encoder = OneHotEncoder(dropLast=True, inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["onehot_encoded1", "onehot_encoded2"])
encoded_model = encoder.fit(df) # 변환될 df를 넣고 학습시킴
encoded_df = encoded_model.transform(df) # 만들어진 모델에 df를 넣고 transform하면 변환 후 df가 반환 됨
#encoded_df = encoded_model.fit(df).transform(df) # 학습과 적합 한꺼번에도 가능
print(encoded_df.show())
display(encoded_df)
> +--------------+--------------+---------------+---------------+
|categoryIndex1|categoryIndex2|onehot_encoded1|onehot_encoded2|
+--------------+--------------+---------------+---------------+
| 0.0| 1.0| (3,[0],[1.0])| (3,[1],[1.0])|
| 1.0| 0.0| (3,[1],[1.0])| (3,[0],[1.0])|
| 2.0| 1.0| (3,[2],[1.0])| (3,[1],[1.0])|
| 0.0| 2.0| (3,[0],[1.0])| (3,[2],[1.0])|
| 0.0| 1.0| (3,[0],[1.0])| (3,[1],[1.0])|
| 2.0| 0.0| (3,[2],[1.0])| (3,[0],[1.0])|
+--------------+--------------+---------------+---------------+
여러 컬럼 한 컬럼으로 encoding: VectorAssembler 사용
기존 iris 컬럼들을 features라는 하나의 컬럼으로 변경
df
> +--------------+--------------+---------------+---------------+
'sepal_length'| 'sepal_width'| 'petal_length'| 'petal_width'
+--------------+--------------+---------------+---------------+
4.3 | 3.0 | 0.1 | 0.1
4.4 | 2.9 | 1.4 | 0.2
4.6 | 3.4 | 1.4 | 0.3
4.6 | 3.6 | 1.0 | 0.2
iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
vec_assembler = VectorAssembler(inputCols=iris_columns, outputCol='features')
feature_vector_df = vec_assembler.transform(df) # vectorAssembler는 학습(fit)없이 바로 transform 호출
> feature_vector_df의 features 컬럼
[4.3, 3.0, 0.1, 0.1]
[4.4, 2.9, 1.4, 0.2]
[4.6, 3.4, 1.4, 0.3]
[4.6, 3.6, 1.0, 0.2]