各ECモールの商品カテゴリ/商品分類を技術を使って一致させたい(カテゴリーをマッチさせる)
前回の続きです。
今回は pythonの環境と実装したスクリプトの挙動について簡単に説明します。
環境設定
pythonの実行環境は anaconda 4.8.5 を使っています。
今回は
- 日本語の処理
- MeCab
- ipadic
- jaconv
- word2vec
- gensim
の2つをやろうと思っています。
今回用の環境の作成
今回のスクリプトを実行させる環境をゼロから作ってみました。
conda create -p D:\\----\\envs\\word2vec conda activate word2vec conda install pip
日本語の分かち書き
pip install MeCab ipadic jaconv
word2vec
- pythonライブラリ
- gensim
- 学習済みデータ
- chiVe
pip install gensim
chiVeは以下からダウンロードします。
https://github.com/WorksApplications/chiVe
今回は v1.2 mc5 を使います。
実装
流れ
- 比較をする2つのCSVを読む
- word2vec
- 2つのECマーケットのカテゴリー群を全組み合わせでループ。
- 比較対象になる2つのカテゴリーに紐づく単語すべての特徴ベクトルの平均を算出し、類似度を算出。
ソースコード全体
長いのですが、まずは全体を
"""ECマーケットカテゴリーの一致候補を出力するモジュール """ import csv from logging import Logger, getLogger, DEBUG, FileHandler, StreamHandler, Formatter import re import MeCab import ipadic import gensim import time import jaconv import numpy as np from scipy import spatial import os from typing import Callable class Timer(object): """タイマークラス 参考: https://gist.github.com/imflop/7e481ec32874b78992e4 """ def __init__( self, verbose: bool=False, logger:Logger=getLogger(__name__)) -> None: self.verbose:bool = verbose self.logger:Logger = logger def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.end = time.time() self.secs = self.end - self.start self.msecs = self.secs * 1000 if self.verbose: self.logger.debug(f'elapsed time: {self.msecs} ms') class Word2VecComparer: """ word2vecによる比較クラス 参考:https://qiita.com/yoppe/items/512c7c072d08c64afa7e """ def __init__(self, w2v_model, num_features:int) -> None: self.w2v_model = w2v_model self.num_features = num_features def avg_feature_vector(self, words): feature_vec = np.zeros((self.num_features,), dtype='float32') words_len = len(words) for word in words: if word in self.w2v_model.key_to_index: feature_vec = np.add(feature_vec, self.w2v_model[word]) else: # 辞書にない言葉はスルー。 words_len -= 1 if words_len > 0: feature_vec = np.divide(feature_vec, words_len) return feature_vec def sentence_similarity(self, words_1, words_2): sentence_1_avg_vector = self.avg_feature_vector(words_1) sentence_2_avg_vector = self.avg_feature_vector(words_2) # すべてゼロのベクターは spatial.distance.cosineでnanを返すはずなのに # 返さないケースがあったのでチェック if np.all(sentence_1_avg_vector==0) or np.all(sentence_2_avg_vector==0): return np.nan # 1からベクトル間の距離を引いてあげることで、コサイン類似度を計算 return 1 - spatial.distance.cosine( sentence_1_avg_vector, sentence_2_avg_vector) class WordSeparatedCategory: """分かち書きや階層構造で単語に分割されたカテゴリーのクラス """ split_pattern = re.compile(r'[>・、/\¥&]') """階層化しているカテゴリーを分割するためのパターン """ mecab = MeCab.Tagger(ipadic.MECAB_ARGS + ' -Owakati') """ipadicと分かち書きを設定したMeCab """ def __init__(self, category_id:str, category_path:str) -> None: """[summary] カテゴリーIDとカテゴリーパス(親のカテゴリーも含めた階層構造)で初期化する。 カテゴリーパスはパスの区切りとMeCabで各単語を配列にする。 Args: id (str): カテゴリーのID category_path (str): カテゴリーのパス """ self.category_id = category_id self.category_path = category_path parts = [] # 全角に変換する。 zen = jaconv.h2z(category_path) # 区切り文字で分割する。word2vecの辞書に存在しやすいよう、特殊記号も含めて分割する。 words = [words for words in WordSeparatedCategory.split_pattern.split( zen)] # 分割した語句をMeCabで分かち書きする。 for word in words: parts.extend( WordSeparatedCategory.mecab.parse(word).replace( ' \n', '').split()) # 重複の削除 self.path_words = set(parts) class CompareResult: """比較結果クラス。比較対象2つと比較結果の値を持つ """ def __init__(self, a:WordSeparatedCategory, b:WordSeparatedCategory) -> None: self.a:WordSeparatedCategory = a self.b:WordSeparatedCategory = b self.similarity:float = 0 def setup_logger( name:str, logfile:str=f'{os.path.splitext(__file__)[0]}-log.txt'): """ロガーのセットアップ 参考: https://qiita.com/shotakaha/items/0fa2db1dc8253c83e2bb Args: name (str): ロガー名 logfile (str, optional): 出力されるログファイル名. Defaults to 'debuglog.txt'. """ logger:Logger = getLogger(name) logger.setLevel(DEBUG) fh:FileHandler = FileHandler(logfile) fh.setLevel(DEBUG) fh_formatter = Formatter( '%(asctime)s - %(levelname)s - %(filename)s \ - %(name)s - %(funcName)s - %(message)s') fh.setFormatter(fh_formatter) ch:StreamHandler = StreamHandler() ch.setLevel(DEBUG) ch_formatter = Formatter( '%(asctime)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S') ch.setFormatter(ch_formatter) logger.addHandler(fh) logger.addHandler(ch) def conv_csv_to_ws_categories(path:str): """特定フォーマットのCSVを読み込んで WordSeparatedCategory オブジェクトのリストに変換する。 Args: path (str): 開く対象のCSVのパス Returns: CategoryParts[]: WordSeparatedCategory オブジェクトのリスト """ getLogger(__name__).debug('Read csv %s', path) with open(path, newline='', encoding='utf-8') as f: reader = csv.DictReader(f) return [WordSeparatedCategory( row['CategoryId'], row['CategoryPath']) for row in reader] def conv_results_to_dict( results:list[CompareResult], key:Callable[[CompareResult], WordSeparatedCategory] ) -> dict[WordSeparatedCategory, list[CompareResult]]: """カテゴリーペアの辞書化。 各要素にCompareResult形式の要素を持つリストを元に、 keyをCompareResultのいずれかの要素、 valueをCompareResultのリストの形式の辞書を返却する。 Args: results (list): 各要素に CompareResult の要素を持つリスト key (lambda): 辞書のキーとなるオブジェクトを取得する関数 """ result_dict = {} # Aに対してBの候補を表示するために、Aカテ:Bカテリスト の構造辞書を構築する for pair in results: result_dict.setdefault(key(pair), []).append(pair) # ソートして先頭10件のみ出力する。先頭が1の場合(完全一致)は先頭のみとする。 for k, v in result_dict.items(): v.sort(key=lambda x: 0 if np.isnan(x.similarity) else x.similarity, reverse=True) result_dict[k] = [v[0]] if v[0].similarity == 1 else v[:10] return result_dict def main(): """メイン処理 """ # 定数 model_path = r'chive-1.2-mc5_gensim\chive-1.2-mc5.kv' a_name = 'auPayMarket' b_name = 'yahooShopping' csv_path_a = f'{a_name}_category.csv' csv_path_b = f'{b_name}_category.csv' csv_path_result_a2b = f'result_{a_name}-{b_name}.csv' csv_path_result_b2a = f'result_{b_name}-{a_name}.csv' # init logger setup_logger(__name__) logger = getLogger(__name__) logger.debug('---カテゴリー類似率計算---') logger.debug('Word2Vecモデル:%s', model_path) logger.debug('比較カテゴリーA:%s', csv_path_a) logger.debug('比較カテゴリーB:%s', csv_path_b) # CSVの読み込み with Timer(True): cats_a = conv_csv_to_ws_categories(csv_path_a) with Timer(True): cats_b = conv_csv_to_ws_categories(csv_path_b) # gensimモデルの読み込み logger.debug('Load model:%s', model_path) with Timer(True): model = gensim.models.KeyedVectors.load(model_path) print(type(model)) comparer = Word2VecComparer(model, 300) # カテゴリー比較ペアを作る(AカテとBカテ全組み合わせ) results = [CompareResult(cat_a, cat_b) \ for cat_a in cats_a for cat_b in cats_b] logger.debug('Calc similarity count:%s', len(results)) with Timer(True): for result in results: result.similarity = comparer.sentence_similarity( result.a.path_words, result.b.path_words) # CSV出力 # AのカテゴリーからBが決まるパターンとその逆を考慮し、2通り出力する logger.debug('Write csv: A to B') with Timer(True): dict_a2bs = conv_results_to_dict(results, lambda result: result.a) with open(csv_path_result_a2b, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow([ f'id-{a_name}', f'category-{a_name}', f'id-{b_name}', f'category-{b_name}', 'similarity']) for k, v in dict_a2bs.items(): writer.writerows([[ k.category_id, k.category_path, p.b.category_id, p.b.category_path, p.similarity] for p in v]) logger.debug('Write csv: B to A') with Timer(True): dict_b2as = conv_results_to_dict(results, lambda result: result.b) with open(csv_path_result_b2a, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow([ f'id-{b_name}', f'category-{b_name}', f'id-{a_name}', f'category-{a_name}', 'similarity']) for k, v in dict_b2as.items(): writer.writerows([[ k.category_id, k.category_path, p.a.category_id, p.a.category_path, p.similarity] for p in v]) logger.debug('---カテゴリー類似率計算 完了---') if __name__ == '__main__': main()
部分解説 Timer クラス
class Timer(object): """タイマークラス 参考: https://gist.github.com/imflop/7e481ec32874b78992e4 """ def __init__( self, verbose: bool=False, logger:Logger=getLogger(__name__)) -> None: self.verbose:bool = verbose self.logger:Logger = logger def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.end = time.time() self.secs = self.end - self.start self.msecs = self.secs * 1000 if self.verbose: self.logger.debug(f'elapsed time: {self.msecs} ms')
このクラスは実行時間を計測するために検索していて見つけたクラスを少し変更したものになっています。
元のソースコードは以下リンク Python class for elapsed time in sec or msec · GitHub
変更点は以下です。
- 初期化時に Logger オブジェクトを渡す構造にした。
- print ではなく debug で出力するようにした。
- VSCodeの候補表示を有効にするために型を明示した。
部分解説 Word2VecComparer クラス
class Word2VecComparer: """ word2vecによる比較クラス 参考:https://qiita.com/yoppe/items/512c7c072d08c64afa7e """ def __init__(self, w2v_model, num_features:int) -> None: self.w2v_model = w2v_model self.num_features = num_features def avg_feature_vector(self, words): feature_vec = np.zeros((self.num_features,), dtype='float32') words_len = len(words) for word in words: if word in self.w2v_model.key_to_index: feature_vec = np.add(feature_vec, self.w2v_model[word]) else: # 辞書にない言葉はスルー。 words_len -= 1 if words_len > 0: feature_vec = np.divide(feature_vec, words_len) return feature_vec def sentence_similarity(self, words_1, words_2): sentence_1_avg_vector = self.avg_feature_vector(words_1) sentence_2_avg_vector = self.avg_feature_vector(words_2) # すべてゼロのベクターは spatial.distance.cosineでnanを返すはずなのに # 返さないケースがあったのでチェック if np.all(sentence_1_avg_vector==0) or np.all(sentence_2_avg_vector==0): return np.nan # 1からベクトル間の距離を引いてあげることで、コサイン類似度を計算 return 1 - spatial.distance.cosine( sentence_1_avg_vector, sentence_2_avg_vector)
下記記事を参考にして書いた、複数の単語の組み合わせの類似度を算出するクラスです。
元の処理と異なっている点は
- 初期化時にgensimモデル(KeyedVectors)オブジェクトと次元の数値を取るように変更
- 分かち書きだけでなく、重複削除や全角変換など処理が多くなったので、「文章を受けてMeCabで分かち書き」ではなく「単語のリストを受ける」ように変更
- 辞書にない言葉はなかったものとして類似度を算出するように変更
- すべてゼロのベクターなのにspatial.distance.cosine の戻り値がnanを返さなかったのでチェック処理を追加
部分解説 WordSeparatedCategory クラス
class WordSeparatedCategory: """分かち書きや階層構造で単語に分割されたカテゴリーのクラス """ split_pattern = re.compile(r'[>・、/\¥&]') """階層化しているカテゴリーを分割するためのパターン """ mecab = MeCab.Tagger(ipadic.MECAB_ARGS + ' -Owakati') """ipadicと分かち書きを設定したMeCab """ def __init__(self, category_id:str, category_path:str) -> None: """[summary] カテゴリーIDとカテゴリーパス(親のカテゴリーも含めた階層構造)で初期化する。 カテゴリーパスはパスの区切りとMeCabで各単語を配列にする。 Args: id (str): カテゴリーのID category_path (str): カテゴリーのパス """ self.category_id = category_id self.category_path = category_path parts = [] # 全角に変換する。 zen = jaconv.h2z(category_path) # 区切り文字で分割する。word2vecの辞書に存在しやすいよう、特殊記号も含めて分割する。 words = [words for words in WordSeparatedCategory.split_pattern.split( zen)] # 分割した語句をMeCabで分かち書きする。 for word in words: parts.extend( WordSeparatedCategory.mecab.parse(word).replace( ' \n', '').split()) # 重複の削除 self.path_words = set(parts)
分かち書きや階層構造で単語に分割されたカテゴリーのクラスです。
以下のインスタンス変数を持ちます。
- カテゴリーID
- カテゴリーパス(階層の記号などをそのままにしたカテゴリー名称)
- パスの単語リスト(set)
以下のようにカテゴリーを単語レベルに分割し、word2vecでの分析のための準備をします。
a=WordSeparatedCategory('00000','ファッション>レディースファッション>アウター>ピーコート') print(a.path_words) #=>{'レディース', 'ピーコート', 'ファッション', 'アウター'} a =WordSeparatedCategory('00000','食品・飲み物>ジュース、清涼飲料水') print(a.path_words) #=>{'清涼飲料水', '食品', 'ジュース', '飲み物'}
部分解説 CompareResult クラス
class CompareResult: """比較結果クラス。比較対象2つと比較結果の値を持つ """ def __init__(self, a:WordSeparatedCategory, b:WordSeparatedCategory) -> None: self.a:WordSeparatedCategory = a self.b:WordSeparatedCategory = b self.similarity:float = 0
比較結果のクラスです。
以下のインスタンス変数を持ちます。
- 比較対象のカテゴリー A
- 比較対象のカテゴリー B
- 比較結果の数値
入れ物のクラスですね。特に処理は入れていません。
部分解説 関数 setup_logger
def setup_logger( name:str, logfile:str=f'{os.path.splitext(__file__)[0]}-log.txt'): """ロガーのセットアップ 参考: https://qiita.com/shotakaha/items/0fa2db1dc8253c83e2bb Args: name (str): ロガー名 logfile (str, optional): 出力されるログファイル名. Defaults to 'debuglog.txt'. """ logger:Logger = getLogger(name) logger.setLevel(DEBUG) fh:FileHandler = FileHandler(logfile) fh.setLevel(DEBUG) fh_formatter = Formatter( '%(asctime)s - %(levelname)s - %(filename)s \ - %(name)s - %(funcName)s - %(message)s') fh.setFormatter(fh_formatter) ch:StreamHandler = StreamHandler() ch.setLevel(DEBUG) ch_formatter = Formatter( '%(asctime)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S') ch.setFormatter(ch_formatter) logger.addHandler(fh) logger.addHandler(ch)
ロガーを初期化する関数です。 下記の記事を参考にして実装しました。
今まで print でログをしていましたが、Logger というものを知り、勉強のために使ってみました。 ログのファイル出力が簡単にできるのは嬉しいです。
部分解説 関数 conv_csv_to_ws_categories
def conv_csv_to_ws_categories(path:str): """特定フォーマットのCSVを読み込んで WordSeparatedCategory オブジェクトのリストに変換する。 Args: path (str): 開く対象のCSVのパス Returns: CategoryParts[]: WordSeparatedCategory オブジェクトのリスト """ getLogger(__name__).debug('Read csv %s', path) with open(path, newline='', encoding='utf-8') as f: reader = csv.DictReader(f) return [WordSeparatedCategory( row['CategoryId'], row['CategoryPath']) for row in reader]
特定フォーマットのCSVを読み込んで WordSeparatedCategory オブジェクトのリストに変換します。
特定フォーマットとは以下の列を持つCSVです。
- CategoryId
- CategoryPath
この関数では csv.DictReader を使って辞書でアクセスすることを試しました。
部分解説 関数 conv_results_to_dict
def conv_results_to_dict( results:list[CompareResult], key:Callable[[CompareResult], WordSeparatedCategory] ) -> dict[WordSeparatedCategory, list[CompareResult]]: """カテゴリーペアの辞書化。 各要素にCompareResult形式の要素を持つリストを元に、 keyをCompareResultのいずれかの要素、 valueをCompareResultのリストの形式の辞書を返却する。 Args: results (list): 各要素に CompareResult の要素を持つリスト key (lambda): 辞書のキーとなるオブジェクトを取得する関数 """ result_dict = {} # Aに対してBの候補を表示するために、Aカテ:Bカテリスト の構造辞書を構築する for pair in results: result_dict.setdefault(key(pair), []).append(pair) # ソートして先頭10件のみ出力する。先頭が1の場合(完全一致)は先頭のみとする。 for k, v in result_dict.items(): v.sort(key=lambda x: 0 if np.isnan(x.similarity) else x.similarity, reverse=True) result_dict[k] = [v[0]] if v[0].similarity == 1 else v[:10] return result_dict
各要素にCompareResult形式の要素を持つリストを元に、keyをCompareResultのWordSeparatedCategoryのいずれか、valueをCompareResultのリストの形式の辞書を返却する関数です。
これはカテゴリーの一致を考える上で、粒度が比較するECによって違うことで1対1に決まらない事が多いため、片方のカテゴリーに対して候補のカテゴリーリストを出力するために実装しました。
部分解説 関数 main
def main(): """メイン処理 """ # 定数 model_path = r'chive-1.2-mc5_gensim\chive-1.2-mc5.kv' a_name = 'auPayMarket' b_name = 'yahooShopping' csv_path_a = f'{a_name}_category.csv' csv_path_b = f'{b_name}_category.csv' csv_path_result_a2b = f'result_{a_name}-{b_name}.csv' csv_path_result_b2a = f'result_{b_name}-{a_name}.csv' # init logger setup_logger(__name__) logger = getLogger(__name__) logger.debug('---カテゴリー類似率計算---') logger.debug('Word2Vecモデル:%s', model_path) logger.debug('比較カテゴリーA:%s', csv_path_a) logger.debug('比較カテゴリーB:%s', csv_path_b) # CSVの読み込み with Timer(True): cats_a = conv_csv_to_ws_categories(csv_path_a) with Timer(True): cats_b = conv_csv_to_ws_categories(csv_path_b) # gensimモデルの読み込み logger.debug('Load model:%s', model_path) with Timer(True): model = gensim.models.KeyedVectors.load(model_path) print(type(model)) comparer = Word2VecComparer(model, 300) # カテゴリー比較ペアを作る(AカテとBカテ全組み合わせ) results = [CompareResult(cat_a, cat_b) \ for cat_a in cats_a for cat_b in cats_b] logger.debug('Calc similarity count:%s', len(results)) with Timer(True): for result in results: result.similarity = comparer.sentence_similarity( result.a.path_words, result.b.path_words) # CSV出力 # AのカテゴリーからBが決まるパターンとその逆を考慮し、2通り出力する logger.debug('Write csv: A to B') with Timer(True): dict_a2bs = conv_results_to_dict(results, lambda result: result.a) with open(csv_path_result_a2b, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow([ f'id-{a_name}', f'category-{a_name}', f'id-{b_name}', f'category-{b_name}', 'similarity']) for k, v in dict_a2bs.items(): writer.writerows([[ k.category_id, k.category_path, p.b.category_id, p.b.category_path, p.similarity] for p in v]) logger.debug('Write csv: B to A') with Timer(True): dict_b2as = conv_results_to_dict(results, lambda result: result.b) with open(csv_path_result_b2a, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow([ f'id-{b_name}', f'category-{b_name}', f'id-{a_name}', f'category-{a_name}', 'similarity']) for k, v in dict_b2as.items(): writer.writerows([[ k.category_id, k.category_path, p.a.category_id, p.a.category_path, p.similarity] for p in v]) logger.debug('---カテゴリー類似率計算 完了---')
メイン処理です。
全体的に Timerクラスを活用して実行時間をログするようにしています。
流れ
- CSVよんでカテゴリーのリストを取得する・・・①
- gensim モデル読む(これが80秒前後かかる)
- ①の2つのリストを全組み合わせで CompareteResult オブジェクトリストを作る・・・②
- ②をすべてループして類似度を計算する。
- CSV出力する
CSV出力は2通り出力するのですが、うまいこと関数化ができず、冗長な書き方になってしまっています。
以上がスクリプトの解説です。
次回はマッチさせた結果について考えてみようと思います。