命名って難しい

変数、関数、クラスなどなど実装より命名に毎回悩むタイプの人間による技術についてのメモ。

各ECモールの商品カテゴリ/商品分類を技術を使って一致させたい(カテゴリーをマッチさせる)

前回の続きです。

notshown.hatenablog.jp

今回は pythonの環境と実装したスクリプトの挙動について簡単に説明します。

環境設定

pythonの実行環境は anaconda 4.8.5 を使っています。

今回は

  • 日本語の処理
  • word2vec
    • gensim

の2つをやろうと思っています。

今回用の環境の作成

今回のスクリプトを実行させる環境をゼロから作ってみました。

conda create -p D:\\----\\envs\\word2vec
conda activate word2vec
conda install pip

日本語の分かち書き

pip install MeCab ipadic jaconv

word2vec

  • pythonライブラリ
    • gensim
  • 学習済みデータ
    • chiVe
pip install gensim

chiVeは以下からダウンロードします。

https://github.com/WorksApplications/chiVe

今回は v1.2 mc5 を使います。

実装

流れ

  • 比較をする2つのCSVを読む
    • 各カテゴリーを階層別に分ける。
      • "ファッション>レディースファッション" → ["ファッション","レディースファッション"]
    • 各階層のカテゴリー名をMeCab分かち書きする。
      • ["ファッション","レディースファッション"] → ["ファッション","レディース","ファッション"]
    • すべての単語の重複を削除する
      • ["ファッション","レディース","ファッション"] → ["レディース","ファッション"]
  • word2vec
    • 2つのECマーケットのカテゴリー群を全組み合わせでループ。
    • 比較対象になる2つのカテゴリーに紐づく単語すべての特徴ベクトルの平均を算出し、類似度を算出。

ソースコード全体

長いのですが、まずは全体を

"""ECマーケットカテゴリーの一致候補を出力するモジュール

"""

import csv
from logging import Logger, getLogger, DEBUG, FileHandler, StreamHandler, Formatter
import re
import MeCab
import ipadic
import gensim
import time
import jaconv
import numpy as np
from scipy import spatial
import os
from typing import Callable


class Timer(object):
  """タイマークラス
  参考: https://gist.github.com/imflop/7e481ec32874b78992e4
  """
  def __init__(
    self,
    verbose: bool=False,
    logger:Logger=getLogger(__name__)) -> None:
    self.verbose:bool = verbose
    self.logger:Logger = logger

  def __enter__(self):
    self.start = time.time()
    return self

  def __exit__(self, *args):
    self.end = time.time()
    self.secs = self.end - self.start
    self.msecs = self.secs * 1000
    if self.verbose:
      self.logger.debug(f'elapsed time: {self.msecs} ms')


class Word2VecComparer:
  """ word2vecによる比較クラス
  参考:https://qiita.com/yoppe/items/512c7c072d08c64afa7e
  """
  def __init__(self, w2v_model, num_features:int) -> None:
    self.w2v_model = w2v_model
    self.num_features = num_features

  def avg_feature_vector(self, words):
    feature_vec = np.zeros((self.num_features,), dtype='float32')
    words_len = len(words)
    for word in words:
      if word in self.w2v_model.key_to_index:
        feature_vec = np.add(feature_vec, self.w2v_model[word])
      else:
        # 辞書にない言葉はスルー。
        words_len -= 1
    if words_len > 0:
      feature_vec = np.divide(feature_vec, words_len)
    return feature_vec

  def sentence_similarity(self, words_1, words_2):
    sentence_1_avg_vector = self.avg_feature_vector(words_1)
    sentence_2_avg_vector = self.avg_feature_vector(words_2)

    # すべてゼロのベクターは spatial.distance.cosineでnanを返すはずなのに
    # 返さないケースがあったのでチェック
    if np.all(sentence_1_avg_vector==0) or np.all(sentence_2_avg_vector==0):
      return np.nan

    # 1からベクトル間の距離を引いてあげることで、コサイン類似度を計算
    return 1 - spatial.distance.cosine(
      sentence_1_avg_vector, sentence_2_avg_vector)


class WordSeparatedCategory:
  """分かち書きや階層構造で単語に分割されたカテゴリーのクラス
  """

  split_pattern = re.compile(r'[>・、/\¥&]')
  """階層化しているカテゴリーを分割するためのパターン
  """

  mecab = MeCab.Tagger(ipadic.MECAB_ARGS + ' -Owakati')
  """ipadicと分かち書きを設定したMeCab
  """

  def __init__(self, category_id:str, category_path:str) -> None:
    """[summary]
    カテゴリーIDとカテゴリーパス(親のカテゴリーも含めた階層構造)で初期化する。
    カテゴリーパスはパスの区切りとMeCabで各単語を配列にする。

    Args:
      id (str): カテゴリーのID
      category_path (str): カテゴリーのパス
    """
    self.category_id = category_id
    self.category_path = category_path
    parts = []

    # 全角に変換する。
    zen = jaconv.h2z(category_path)
    # 区切り文字で分割する。word2vecの辞書に存在しやすいよう、特殊記号も含めて分割する。
    words = [words for words in WordSeparatedCategory.split_pattern.split(
      zen)]

    # 分割した語句をMeCabで分かち書きする。
    for word in words:
      parts.extend(
        WordSeparatedCategory.mecab.parse(word).replace(
          ' \n', '').split())

    # 重複の削除
    self.path_words = set(parts)


class CompareResult:
  """比較結果クラス。比較対象2つと比較結果の値を持つ
  """
  def __init__(self, a:WordSeparatedCategory, b:WordSeparatedCategory) -> None:
    self.a:WordSeparatedCategory = a
    self.b:WordSeparatedCategory = b
    self.similarity:float = 0


def setup_logger(
    name:str, logfile:str=f'{os.path.splitext(__file__)[0]}-log.txt'):
  """ロガーのセットアップ
  参考: https://qiita.com/shotakaha/items/0fa2db1dc8253c83e2bb

  Args:
    name (str): ロガー名
    logfile (str, optional): 出力されるログファイル名. Defaults to 'debuglog.txt'.

  """

  logger:Logger = getLogger(name)
  logger.setLevel(DEBUG)

  fh:FileHandler = FileHandler(logfile)
  fh.setLevel(DEBUG)
  fh_formatter = Formatter(
    '%(asctime)s - %(levelname)s - %(filename)s \
        - %(name)s - %(funcName)s - %(message)s')
  fh.setFormatter(fh_formatter)

  ch:StreamHandler = StreamHandler()
  ch.setLevel(DEBUG)
  ch_formatter = Formatter(
    '%(asctime)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S')
  ch.setFormatter(ch_formatter)

  logger.addHandler(fh)
  logger.addHandler(ch)


def conv_csv_to_ws_categories(path:str):
  """特定フォーマットのCSVを読み込んで WordSeparatedCategory オブジェクトのリストに変換する。

  Args:
    path (str): 開く対象のCSVのパス

  Returns:
    CategoryParts[]: WordSeparatedCategory オブジェクトのリスト
  """

  getLogger(__name__).debug('Read csv %s', path)

  with open(path, newline='', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    return [WordSeparatedCategory(
      row['CategoryId'], row['CategoryPath']) for row in reader]


def conv_results_to_dict(
    results:list[CompareResult],
    key:Callable[[CompareResult], WordSeparatedCategory]
    ) -> dict[WordSeparatedCategory, list[CompareResult]]:
  """カテゴリーペアの辞書化。
  各要素にCompareResult形式の要素を持つリストを元に、
  keyをCompareResultのいずれかの要素、
  valueをCompareResultのリストの形式の辞書を返却する。

  Args:
    results (list): 各要素に CompareResult の要素を持つリスト
    key (lambda): 辞書のキーとなるオブジェクトを取得する関数
  """
  result_dict = {}

  # Aに対してBの候補を表示するために、Aカテ:Bカテリスト の構造辞書を構築する
  for pair in results:
    result_dict.setdefault(key(pair), []).append(pair)

  # ソートして先頭10件のみ出力する。先頭が1の場合(完全一致)は先頭のみとする。
  for k, v in result_dict.items():
    v.sort(key=lambda x:
      0 if np.isnan(x.similarity) else x.similarity, reverse=True)
    result_dict[k] = [v[0]] if v[0].similarity == 1 else v[:10]

  return result_dict


def main():
  """メイン処理
  """
  # 定数
  model_path = r'chive-1.2-mc5_gensim\chive-1.2-mc5.kv'
  a_name = 'auPayMarket'
  b_name = 'yahooShopping'
  csv_path_a = f'{a_name}_category.csv'
  csv_path_b = f'{b_name}_category.csv'
  csv_path_result_a2b = f'result_{a_name}-{b_name}.csv'
  csv_path_result_b2a = f'result_{b_name}-{a_name}.csv'

  # init logger
  setup_logger(__name__)
  logger = getLogger(__name__)
  logger.debug('---カテゴリー類似率計算---')
  logger.debug('Word2Vecモデル:%s', model_path)
  logger.debug('比較カテゴリーA:%s', csv_path_a)
  logger.debug('比較カテゴリーB:%s', csv_path_b)

  # CSVの読み込み
  with Timer(True):
    cats_a = conv_csv_to_ws_categories(csv_path_a)
  with Timer(True):
    cats_b = conv_csv_to_ws_categories(csv_path_b)

  # gensimモデルの読み込み
  logger.debug('Load model:%s', model_path)
  with Timer(True):
    model = gensim.models.KeyedVectors.load(model_path)
    print(type(model))
  comparer = Word2VecComparer(model, 300)

  # カテゴリー比較ペアを作る(AカテとBカテ全組み合わせ)
  results = [CompareResult(cat_a, cat_b) \
      for cat_a in cats_a for cat_b in cats_b]

  logger.debug('Calc similarity count:%s', len(results))
  with Timer(True):
    for result in results:
      result.similarity = comparer.sentence_similarity(
        result.a.path_words, result.b.path_words)

  # CSV出力
  # AのカテゴリーからBが決まるパターンとその逆を考慮し、2通り出力する
  logger.debug('Write csv: A to B')
  with Timer(True):
    dict_a2bs = conv_results_to_dict(results, lambda result: result.a)
    with open(csv_path_result_a2b, 'w', encoding='utf-8', newline='') as f:
      writer = csv.writer(f)
      writer.writerow([
        f'id-{a_name}', f'category-{a_name}',
        f'id-{b_name}', f'category-{b_name}',
        'similarity'])
      for k, v in dict_a2bs.items():
        writer.writerows([[
            k.category_id, k.category_path,
            p.b.category_id, p.b.category_path,
            p.similarity] for p in v])

  logger.debug('Write csv: B to A')
  with Timer(True):
    dict_b2as = conv_results_to_dict(results, lambda result: result.b)
    with open(csv_path_result_b2a, 'w', encoding='utf-8', newline='') as f:
      writer = csv.writer(f)
      writer.writerow([
        f'id-{b_name}', f'category-{b_name}',
        f'id-{a_name}', f'category-{a_name}',
        'similarity'])
      for k, v in dict_b2as.items():
        writer.writerows([[
            k.category_id, k.category_path,
            p.a.category_id, p.a.category_path,
            p.similarity] for p in v])

  logger.debug('---カテゴリー類似率計算 完了---')


if __name__ == '__main__':
  main()

部分解説 Timer クラス

class Timer(object):
  """タイマークラス
  参考: https://gist.github.com/imflop/7e481ec32874b78992e4
  """
  def __init__(
    self,
    verbose: bool=False,
    logger:Logger=getLogger(__name__)) -> None:
    self.verbose:bool = verbose
    self.logger:Logger = logger

  def __enter__(self):
    self.start = time.time()
    return self

  def __exit__(self, *args):
    self.end = time.time()
    self.secs = self.end - self.start
    self.msecs = self.secs * 1000
    if self.verbose:
      self.logger.debug(f'elapsed time: {self.msecs} ms')

このクラスは実行時間を計測するために検索していて見つけたクラスを少し変更したものになっています。

元のソースコードは以下リンク Python class for elapsed time in sec or msec · GitHub

変更点は以下です。

  • 初期化時に Logger オブジェクトを渡す構造にした。
  • print ではなく debug で出力するようにした。
  • VSCodeの候補表示を有効にするために型を明示した。

部分解説 Word2VecComparer クラス

class Word2VecComparer:
  """ word2vecによる比較クラス
  参考:https://qiita.com/yoppe/items/512c7c072d08c64afa7e
  """
  def __init__(self, w2v_model, num_features:int) -> None:
    self.w2v_model = w2v_model
    self.num_features = num_features

  def avg_feature_vector(self, words):
    feature_vec = np.zeros((self.num_features,), dtype='float32')
    words_len = len(words)
    for word in words:
      if word in self.w2v_model.key_to_index:
        feature_vec = np.add(feature_vec, self.w2v_model[word])
      else:
        # 辞書にない言葉はスルー。
        words_len -= 1
    if words_len > 0:
      feature_vec = np.divide(feature_vec, words_len)
    return feature_vec

  def sentence_similarity(self, words_1, words_2):
    sentence_1_avg_vector = self.avg_feature_vector(words_1)
    sentence_2_avg_vector = self.avg_feature_vector(words_2)

    # すべてゼロのベクターは spatial.distance.cosineでnanを返すはずなのに
    # 返さないケースがあったのでチェック
    if np.all(sentence_1_avg_vector==0) or np.all(sentence_2_avg_vector==0):
      return np.nan

    # 1からベクトル間の距離を引いてあげることで、コサイン類似度を計算
    return 1 - spatial.distance.cosine(
      sentence_1_avg_vector, sentence_2_avg_vector)

下記記事を参考にして書いた、複数の単語の組み合わせの類似度を算出するクラスです。

元の処理と異なっている点は

  • 初期化時にgensimモデル(KeyedVectors)オブジェクトと次元の数値を取るように変更
  • 分かち書きだけでなく、重複削除や全角変換など処理が多くなったので、「文章を受けてMeCab分かち書き」ではなく「単語のリストを受ける」ように変更
  • 辞書にない言葉はなかったものとして類似度を算出するように変更
  • すべてゼロのベクターなのにspatial.distance.cosine の戻り値がnanを返さなかったのでチェック処理を追加

部分解説 WordSeparatedCategory クラス

class WordSeparatedCategory:
  """分かち書きや階層構造で単語に分割されたカテゴリーのクラス
  """

  split_pattern = re.compile(r'[>・、/\¥&]')
  """階層化しているカテゴリーを分割するためのパターン
  """

  mecab = MeCab.Tagger(ipadic.MECAB_ARGS + ' -Owakati')
  """ipadicと分かち書きを設定したMeCab
  """

  def __init__(self, category_id:str, category_path:str) -> None:
    """[summary]
    カテゴリーIDとカテゴリーパス(親のカテゴリーも含めた階層構造)で初期化する。
    カテゴリーパスはパスの区切りとMeCabで各単語を配列にする。

    Args:
      id (str): カテゴリーのID
      category_path (str): カテゴリーのパス
    """
    self.category_id = category_id
    self.category_path = category_path
    parts = []

    # 全角に変換する。
    zen = jaconv.h2z(category_path)
    # 区切り文字で分割する。word2vecの辞書に存在しやすいよう、特殊記号も含めて分割する。
    words = [words for words in WordSeparatedCategory.split_pattern.split(
      zen)]

    # 分割した語句をMeCabで分かち書きする。
    for word in words:
      parts.extend(
        WordSeparatedCategory.mecab.parse(word).replace(
          ' \n', '').split())

    # 重複の削除
    self.path_words = set(parts)

分かち書きや階層構造で単語に分割されたカテゴリーのクラスです。

以下のインスタンス変数を持ちます。

  • カテゴリーID
  • カテゴリーパス(階層の記号などをそのままにしたカテゴリー名称)
  • パスの単語リスト(set)

以下のようにカテゴリーを単語レベルに分割し、word2vecでの分析のための準備をします。

a=WordSeparatedCategory('00000','ファッション>レディースファッション>アウター>ピーコート')
print(a.path_words) #=>{'レディース', 'ピーコート', 'ファッション', 'アウター'}
a =WordSeparatedCategory('00000','食品・飲み物>ジュース、清涼飲料水')
print(a.path_words) #=>{'清涼飲料水', '食品', 'ジュース', '飲み物'}

部分解説 CompareResult クラス

class CompareResult:
  """比較結果クラス。比較対象2つと比較結果の値を持つ
  """
  def __init__(self, a:WordSeparatedCategory, b:WordSeparatedCategory) -> None:
    self.a:WordSeparatedCategory = a
    self.b:WordSeparatedCategory = b
    self.similarity:float = 0

比較結果のクラスです。

以下のインスタンス変数を持ちます。

  • 比較対象のカテゴリー A
  • 比較対象のカテゴリー B
  • 比較結果の数値

入れ物のクラスですね。特に処理は入れていません。

部分解説 関数 setup_logger

def setup_logger(
    name:str, logfile:str=f'{os.path.splitext(__file__)[0]}-log.txt'):
  """ロガーのセットアップ
  参考: https://qiita.com/shotakaha/items/0fa2db1dc8253c83e2bb

  Args:
    name (str): ロガー名
    logfile (str, optional): 出力されるログファイル名. Defaults to 'debuglog.txt'.

  """

  logger:Logger = getLogger(name)
  logger.setLevel(DEBUG)

  fh:FileHandler = FileHandler(logfile)
  fh.setLevel(DEBUG)
  fh_formatter = Formatter(
    '%(asctime)s - %(levelname)s - %(filename)s \
        - %(name)s - %(funcName)s - %(message)s')
  fh.setFormatter(fh_formatter)

  ch:StreamHandler = StreamHandler()
  ch.setLevel(DEBUG)
  ch_formatter = Formatter(
    '%(asctime)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S')
  ch.setFormatter(ch_formatter)

  logger.addHandler(fh)
  logger.addHandler(ch)

ロガーを初期化する関数です。 下記の記事を参考にして実装しました。

qiita.com

今まで print でログをしていましたが、Logger というものを知り、勉強のために使ってみました。 ログのファイル出力が簡単にできるのは嬉しいです。

部分解説 関数 conv_csv_to_ws_categories

def conv_csv_to_ws_categories(path:str):
  """特定フォーマットのCSVを読み込んで WordSeparatedCategory オブジェクトのリストに変換する。

  Args:
    path (str): 開く対象のCSVのパス

  Returns:
    CategoryParts[]: WordSeparatedCategory オブジェクトのリスト
  """

  getLogger(__name__).debug('Read csv %s', path)

  with open(path, newline='', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    return [WordSeparatedCategory(
      row['CategoryId'], row['CategoryPath']) for row in reader]

特定フォーマットのCSVを読み込んで WordSeparatedCategory オブジェクトのリストに変換します。

特定フォーマットとは以下の列を持つCSVです。

  • CategoryId
  • CategoryPath

この関数では csv.DictReader を使って辞書でアクセスすることを試しました。

部分解説 関数 conv_results_to_dict

def conv_results_to_dict(
    results:list[CompareResult],
    key:Callable[[CompareResult], WordSeparatedCategory]
    ) -> dict[WordSeparatedCategory, list[CompareResult]]:
  """カテゴリーペアの辞書化。
  各要素にCompareResult形式の要素を持つリストを元に、
  keyをCompareResultのいずれかの要素、
  valueをCompareResultのリストの形式の辞書を返却する。

  Args:
    results (list): 各要素に CompareResult の要素を持つリスト
    key (lambda): 辞書のキーとなるオブジェクトを取得する関数
  """
  result_dict = {}

  # Aに対してBの候補を表示するために、Aカテ:Bカテリスト の構造辞書を構築する
  for pair in results:
    result_dict.setdefault(key(pair), []).append(pair)

  # ソートして先頭10件のみ出力する。先頭が1の場合(完全一致)は先頭のみとする。
  for k, v in result_dict.items():
    v.sort(key=lambda x:
      0 if np.isnan(x.similarity) else x.similarity, reverse=True)
    result_dict[k] = [v[0]] if v[0].similarity == 1 else v[:10]

  return result_dict

各要素にCompareResult形式の要素を持つリストを元に、keyをCompareResultのWordSeparatedCategoryのいずれか、valueをCompareResultのリストの形式の辞書を返却する関数です。

これはカテゴリーの一致を考える上で、粒度が比較するECによって違うことで1対1に決まらない事が多いため、片方のカテゴリーに対して候補のカテゴリーリストを出力するために実装しました。

部分解説 関数 main

def main():
  """メイン処理
  """
  # 定数
  model_path = r'chive-1.2-mc5_gensim\chive-1.2-mc5.kv'
  a_name = 'auPayMarket'
  b_name = 'yahooShopping'
  csv_path_a = f'{a_name}_category.csv'
  csv_path_b = f'{b_name}_category.csv'
  csv_path_result_a2b = f'result_{a_name}-{b_name}.csv'
  csv_path_result_b2a = f'result_{b_name}-{a_name}.csv'

  # init logger
  setup_logger(__name__)
  logger = getLogger(__name__)
  logger.debug('---カテゴリー類似率計算---')
  logger.debug('Word2Vecモデル:%s', model_path)
  logger.debug('比較カテゴリーA:%s', csv_path_a)
  logger.debug('比較カテゴリーB:%s', csv_path_b)

  # CSVの読み込み
  with Timer(True):
    cats_a = conv_csv_to_ws_categories(csv_path_a)
  with Timer(True):
    cats_b = conv_csv_to_ws_categories(csv_path_b)

  # gensimモデルの読み込み
  logger.debug('Load model:%s', model_path)
  with Timer(True):
    model = gensim.models.KeyedVectors.load(model_path)
    print(type(model))
  comparer = Word2VecComparer(model, 300)

  # カテゴリー比較ペアを作る(AカテとBカテ全組み合わせ)
  results = [CompareResult(cat_a, cat_b) \
      for cat_a in cats_a for cat_b in cats_b]

  logger.debug('Calc similarity count:%s', len(results))
  with Timer(True):
    for result in results:
      result.similarity = comparer.sentence_similarity(
        result.a.path_words, result.b.path_words)

  # CSV出力
  # AのカテゴリーからBが決まるパターンとその逆を考慮し、2通り出力する
  logger.debug('Write csv: A to B')
  with Timer(True):
    dict_a2bs = conv_results_to_dict(results, lambda result: result.a)
    with open(csv_path_result_a2b, 'w', encoding='utf-8', newline='') as f:
      writer = csv.writer(f)
      writer.writerow([
        f'id-{a_name}', f'category-{a_name}',
        f'id-{b_name}', f'category-{b_name}',
        'similarity'])
      for k, v in dict_a2bs.items():
        writer.writerows([[
            k.category_id, k.category_path,
            p.b.category_id, p.b.category_path,
            p.similarity] for p in v])

  logger.debug('Write csv: B to A')
  with Timer(True):
    dict_b2as = conv_results_to_dict(results, lambda result: result.b)
    with open(csv_path_result_b2a, 'w', encoding='utf-8', newline='') as f:
      writer = csv.writer(f)
      writer.writerow([
        f'id-{b_name}', f'category-{b_name}',
        f'id-{a_name}', f'category-{a_name}',
        'similarity'])
      for k, v in dict_b2as.items():
        writer.writerows([[
            k.category_id, k.category_path,
            p.a.category_id, p.a.category_path,
            p.similarity] for p in v])

  logger.debug('---カテゴリー類似率計算 完了---')

メイン処理です。

全体的に Timerクラスを活用して実行時間をログするようにしています。

流れ

  • CSVよんでカテゴリーのリストを取得する・・・①
  • gensim モデル読む(これが80秒前後かかる)
  • ①の2つのリストを全組み合わせで CompareteResult オブジェクトリストを作る・・・②
  • ②をすべてループして類似度を計算する。
  • CSV出力する

CSV出力は2通り出力するのですが、うまいこと関数化ができず、冗長な書き方になってしまっています。

以上がスクリプトの解説です。

次回はマッチさせた結果について考えてみようと思います。