📎 Spark User Guide, Spark The Definitive Guide


4. MLlib

4.1. MLlib

spark MLlib vs ScikitLearn

4.2. MLlib 구성요소

Transformer

: StringIndexer, OneHotEncoder, VectorAssebler, StandardScaler, Tokenizer

Estimator

: Classification, Regression, Clustering, Collaborative Filtering

from pyspark.ml.classification import DecisionTreeClassifier

dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='Survived')
dt_model = dt_estimator.fit(train_sdf) # train 데이터로 학습 후 테스트 데이터로 예측 
predictions = dt_model.transform(test_sdf) # test 데이터로 적합 후 해당 결과로 정확도 평가

Evaluator

: RegressionEvaluator, BinaryClassificationEvaluator, MultiClassificationEvaluator

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracy_evaluator = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='accuracy')
print(accuracy_evaluator.evaluate(predictions)) # 위 Estimator로 추출한 결과 predictions 넣어서 정확도 확인

Pipeline

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

def encode_columns(sdf, input_cols=None, encode_gubun='label'):
    # label encoding과 one hot encoding 변환 컬럼명 지정. 
    label_cols = ['label_'+col for col in input_cols]
    onehot_cols = ['onehot_'+col for col in input_cols]
    
    #pipeline의 stages로 지정된 StringIndexer와 OneHotEncoder 객체 생성. 
    label_encoder_stage = StringIndexer(inputCols=input_cols, outputCols=label_cols)
    onehot_encoder_stage = OneHotEncoder(inputCols=label_cols, outputCols=onehot_cols)
    
    # encode_gubun이 label이면 StringIndexer stage만 등록, onehot이면 StringIndexer, OneHotEncoder 모두 등록. 
    stages = []
    if encode_gubun == 'label':
        stages = [label_encoder_stage]
    else:
        stages = [label_encoder_stage, onehot_encoder_stage]
        
    pipeline = Pipeline(stages=stages) # 위에서 만들어진 객체로 파이프라인 생성
    sdf = pipeline.fit(sdf).transform(sdf) # 파이프라인 학습 후 적합
    
    return sdf

Model Selection( and Tuning)

: randomSplit, TrainValidationSplit, CrossValidator, ParamGridBuilder

kfold = KFold(n_splits=5)
for train_index, test_index in kfold.split(iris_df):
X_train, X_test = iris_df.iloc[train_index], iris_df.iloc[test_index]
y_train, y_test = label_df.iloc[train_index], label_df.iloc[test_index]

scores = cross_val_score(dt_clf , data , label , scoring='accuracy',cv=3)
grid_dtree = GridSearchCV(dtree, param_grid=parameters, cv=3, refit=True, return_train_score=True)
grid_dtree.fit(X_train, y_train)
scores_df = pd.DataFrame(grid_dtree.cv_results_)
scores_df[['params', 'mean_test_score', 'rank_test_score', 
'split0_test_score', 'split1_test_score', 'split2_test_score']]

Spark ML 사용

cv = CrossValidator(estimator=dt, 
estimatorParamMaps=param_grid, evaluator=evaluator_accuracy, 
numFolds=3)

tvs = TrainValidationSplit(estimator=dt, 
estimatorParamMaps=param_grid, 
evaluator=evaluator_accuracy, trainRatio=0.75)

4.3. Feature Vectorization

Label encoding: StringIndexer 사용

문자형 -> 숫자로 변경
변환될 컬럼명(category)과 변환 후 컬럼명(category_index)을 입력

from pyspark.ml.feature import StringIndexer

df
> +---+--------+
  | id|category|
  +---+--------+
  |  0|       a|
  |  1|       b|
  |  2|       c|
  |  3|       a|
  |  4|       a|
  |  5|       c|
  +---+--------+

indexer = StringIndexer(inputCol='category', outputCol='category_index')
# 여러 컬럼도 가능
# StringIndexer(inputCols=["category1", "category2"], outputCols=["label_encoded1", "label_encoded2"])

indexer_model = indexer.fit(df) # 변환될 df를 넣고 학습시킴
print(indexer_model) # 모델이 만들어짐
> StringIndexerModel: uid=StringIndexer_0ec8942b1277, handleInvalid=error



indexed_df = indexer_model.transform(df) # 만들어진 모델에 df를 넣고 transform하면 변환 후 df가 반환 됨
indexed_df.show()
> +---+--------+--------------+
  | id|category|category_index|
  +---+--------+--------------+
  |  0|       a|           0.0|
  |  1|       b|           2.0|
  |  2|       c|           1.0|
  |  3|       a|           0.0|
  |  4|       a|           0.0|
  |  5|       c|           1.0|
  +---+--------+--------------+

숫자형 -> 문자형
원복 시키기

from pyspark.ml.feature import IndexToString

converter = IndexToString(inputCol='category_index', outputCol='original_category')
converted = converter.transform(indexed_df)
converted.show()
> +---+--------+--------------+-----------------+
  | id|category|category_index|original_category|
  +---+--------+--------------+-----------------+
  |  0|       a|           0.0|                a|
  |  1|       b|           2.0|                b|
  |  2|       c|           1.0|                c|
  |  3|       a|           0.0|                a|
  |  4|       a|           0.0|                a|
  |  5|       c|           1.0|                c|
  +---+--------+--------------+-----------------+

One-hot encoding: OneHotEncoder 사용

크기를 벡터의 차원으로 하고, 표현하고 싶은 단어의 인덱스에 1의 값을 부여하고, 다른 인덱스에는 0을 부여하는 벡터 표현 방식
ex) 5개의 카테고리(0, 1, 2, 3, 4)가 있을 경우
0은 [0.1, 0.0, 0.0, 0.0]
1은 [0.0, 1.0, 0.0, 0.0]
2는 [0.0, 0.0, 1.0, 0.0]
3은 [0.0, 0.0, 0.0, 1.0]
4는 [0.0, 0.0, 0.0, 0.0]

OneHotEncoder될 컬럼은 반드시 숫자형으로 변환되어 있어야 함 따라서 OneHotEncoder를 String 컬럼에 적용 시에는 Label Encoding을 먼저 적용 후에 변환해야 함

dropLast는 마지막 인자를 제외할지를 나타냄 (default=True)
위 예시에서 마지막 카테고리인 4 [0.0, 0.0, 0.0, 0.0] 가 제외됨

OneHotEncoder는 sparse vector 형태로 onehot encoding 적용

from pyspark.ml.feature import OneHotEncoder

df
> +---+--------------------------+
  | categoryIndex1|categoryIndex2|
  +---+--------------------------+
  |            0.0|           1.0|
  |            1.0|           0.0|
  |            2.0|           1.0|
  |            0.0|           2.0|
  |            0.0|           1.0|
  |            2.0|           0.0|
  +---+--------------------------+
  
encoder = OneHotEncoder(dropLast=True, inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["onehot_encoded1", "onehot_encoded2"])
encoded_model = encoder.fit(df) # 변환될 df를 넣고 학습시킴

encoded_df = encoded_model.transform(df) # 만들어진 모델에 df를 넣고 transform하면 변환 후 df가 반환 됨
#encoded_df = encoded_model.fit(df).transform(df) # 학습과 적합 한꺼번에도 가능

print(encoded_df.show())
display(encoded_df)  
> +--------------+--------------+---------------+---------------+
  |categoryIndex1|categoryIndex2|onehot_encoded1|onehot_encoded2|
  +--------------+--------------+---------------+---------------+
  |           0.0|           1.0|  (3,[0],[1.0])|  (3,[1],[1.0])|
  |           1.0|           0.0|  (3,[1],[1.0])|  (3,[0],[1.0])|
  |           2.0|           1.0|  (3,[2],[1.0])|  (3,[1],[1.0])|
  |           0.0|           2.0|  (3,[0],[1.0])|  (3,[2],[1.0])|
  |           0.0|           1.0|  (3,[0],[1.0])|  (3,[1],[1.0])|
  |           2.0|           0.0|  (3,[2],[1.0])|  (3,[0],[1.0])|
  +--------------+--------------+---------------+---------------+

  

여러 컬럼 한 컬럼으로 encoding: VectorAssembler 사용

기존 iris 컬럼들을 features라는 하나의 컬럼으로 변경

df
> +--------------+--------------+---------------+---------------+
   'sepal_length'| 'sepal_width'| 'petal_length'| 'petal_width'
  +--------------+--------------+---------------+---------------+
    4.3          | 3.0          | 0.1           | 0.1
    4.4          | 2.9          | 1.4           | 0.2
    4.6          | 3.4          | 1.4           | 0.3
    4.6          | 3.6          | 1.0           | 0.2
 
iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
vec_assembler = VectorAssembler(inputCols=iris_columns, outputCol='features')
feature_vector_df = vec_assembler.transform(df) # vectorAssembler는 학습(fit)없이 바로 transform 호출
> feature_vector_df의 features 컬럼
[4.3, 3.0, 0.1, 0.1]  
[4.4, 2.9, 1.4, 0.2]  
[4.6, 3.4, 1.4, 0.3]  
[4.6, 3.6, 1.0, 0.2] 



5. MLlib Classification



6. MLlib Regression



7. Use Case