Polarsを用いた高速データ処理:Pandasからの移行と機械学習前処理の最適化
導入
近年、データ処理のパフォーマンスは機械学習プロジェクトにおいて重要な課題の一つとなっています。Pythonのデータ分析ライブラリとして広く利用されているPandasは非常に強力ですが、大規模データセットの処理においてはメモリ使用量や処理速度がボトルネックとなる場合があります。
本記事では、Rustベースで構築された高速なデータ処理ライブラリであるPolarsに焦点を当てます。Polarsは、最新のマルチコアCPUを最大限に活用し、Lazy評価やOptimized Query Engineなどの機能により、Pandasをはるかに上回るパフォーマンスを発揮することが期待されています。Web開発エンジニアの皆様が、AI/MLにおけるデータ前処理やバックエンドでのデータ集計処理において、Polarsを導入し、既存のPandasコードを最適化するための具体的な手順と実践的なコード例を提供いたします。
この記事を通じて、読者の皆様は以下の知識とスキルを習得できます。
- Polarsの基本的なDataFrame操作とPandasとの主要な違い
- 機械学習の前処理タスクにおけるPolarsの適用方法
- PandasからPolarsへのコード移行をスムーズに行うためのヒント
- PolarsのLazy APIの活用による大規模データ処理の最適化
環境構築
本チュートリアルでは、Pythonの仮想環境を推奨します。以下の手順で環境を構築してください。
1. Python仮想環境の作成とアクティベート
まず、任意の作業ディレクトリでPythonの仮想環境を作成し、アクティベートします。
# 仮想環境の作成
python -m venv polars_env
# 仮想環境のアクティベート (macOS/Linux)
source polars_env/bin/activate
# 仮想環境のアクティベート (Windows)
.\polars_env\Scripts\activate
2. 必要なライブラリのインストール
仮想環境をアクティベートした後、本チュートリアルで利用するPolars、Scikit-learn、Pandas(比較用)、NumPyをインストールします。
pip install polars scikit-learn pandas numpy
これにより、Polarsを用いたデータ処理と機械学習前処理を実践するための環境が整いました。
実装手順(チュートリアル本体)
ここでは、実際のデータセットを用いてPolarsの基本操作と機械学習の前処理への適用方法を解説します。例として、仮想的な顧客データを使用します。
1. データセットの準備
まず、サンプルデータとしてCSVファイルを生成します。このデータは、顧客ID、年齢、性別、収入、購買回数、カテゴリカルな評価(A, B, C)を含みます。
import pandas as pd
import numpy as np
import os
# データ生成
np.random.seed(42)
data_size = 100000
df_gen = pd.DataFrame({
'customer_id': range(data_size),
'age': np.random.randint(18, 70, size=data_size),
'gender': np.random.choice(['Male', 'Female', 'Other'], size=data_size),
'income': np.random.randint(30000, 150000, size=data_size),
'purchase_count': np.random.randint(0, 50, size=data_size),
'product_category': np.random.choice(['Electronics', 'Books', 'Home', 'Apparel'], size=data_size),
'satisfaction_level': np.random.choice(['A', 'B', 'C', None], size=data_size, p=[0.3, 0.4, 0.2, 0.1])
})
# 欠損値の導入
num_missing_income = int(data_size * 0.02)
missing_indices_income = np.random.choice(data_size, num_missing_income, replace=False)
df_gen.loc[missing_indices_income, 'income'] = np.nan
num_missing_age = int(data_size * 0.01)
missing_indices_age = np.random.choice(data_size, num_missing_age, replace=False)
df_gen.loc[missing_indices_age, 'age'] = np.nan
# CSVファイルとして保存
file_path = 'customer_data.csv'
df_gen.to_csv(file_path, index=False)
print(f"'{file_path}' を生成しました。")
上記のコードを実行すると、customer_data.csv
というファイルが生成されます。
2. Polarsの基本的なデータ操作
Polarsでのデータ読み込み、表示、基本的な操作方法を解説します。
データの読み込みと表示
Polarsでは pl.read_csv()
関数を使用してCSVファイルを読み込みます。
import polars as pl
# CSVファイルをPolars DataFrameとして読み込み
df_pl = pl.read_csv('customer_data.csv')
# 最初の5行を表示
print("--- Polars DataFrameの最初の5行 ---")
print(df_pl.head())
# DataFrameのスキーマ(データ型)を表示
print("\n--- Polars DataFrameのスキーマ ---")
print(df_pl.schema)
コード解説:
* import polars as pl
でPolarsライブラリを pl
というエイリアスでインポートします。
* pl.read_csv()
は、指定されたCSVファイルを読み込み、Polarsの DataFrame
オブジェクトを生成します。
* df_pl.head()
は、DataFrameの先頭行を表示します。
* df_pl.schema
は、DataFrameの各列の名前とデータ型(スキーマ)を表示します。PolarsはRustの型システムに似た厳密な型を持ち、Int64
, Utf8
, Float64
などで表現されます。
列の選択とフィルタリング
特定の列を選択したり、条件に基づいて行をフィルタリングしたりする方法を示します。
# 'customer_id', 'age', 'income' 列を選択
print("\n--- 選択した列 ---")
print(df_pl.select(['customer_id', 'age', 'income']).head())
# ageが30歳以上かつincomeが50000以上の顧客をフィルタリング
filtered_df_pl = df_pl.filter((pl.col('age') >= 30) & (pl.col('income') >= 50000))
print("\n--- フィルタリング後のDataFrameの最初の5行 ---")
print(filtered_df_pl.head())
print(f"フィルタリング前の行数: {df_pl.height}, フィルタリング後の行数: {filtered_df_pl.height}")
コード解説:
* df_pl.select(['col1', 'col2'])
は、指定された列のみを持つ新しいDataFrameを返します。
* df_pl.filter(条件)
は、条件を満たす行のみを選択します。Polarsでは pl.col()
を使用して列を参照し、&
(and) や |
(or) で複数の条件を組み合わせます。
集計操作
データのグループ化と集計を行います。ここでは、性別ごとの平均収入と平均購買回数を計算します。
# 性別ごとの平均収入と平均購買回数を計算
grouped_df_pl = df_pl.group_by('gender').agg(
pl.col('income').mean().alias('average_income'),
pl.col('purchase_count').mean().alias('average_purchase_count')
)
print("\n--- 性別ごとの集計結果 ---")
print(grouped_df_pl)
コード解説:
* df_pl.group_by('column_name')
で指定された列でDataFrameをグループ化します。
* agg()
メソッド内で集計関数を適用します。pl.col('column_name').mean()
で列の平均値を計算し、.alias('new_column_name')
で結果の列名を指定します。
3. 機械学習前処理への適用
Polarsを使って、欠損値処理、データ型変換、カテゴリカル変数エンコーディングなど、機械学習モデルの訓練に必要な前処理を行います。
欠損値処理
income
列と age
列の欠損値を平均値で補完し、satisfaction_level
列の欠損値を最頻値で補完します。
# 欠損値の補完
# 平均値を計算
mean_income = df_pl.select(pl.col('income').mean()).item()
mean_age = df_pl.select(pl.col('age').mean()).item()
# 最頻値を計算
mode_satisfaction = df_pl.group_by('satisfaction_level').agg(pl.count()).sort('count', descending=True).select('satisfaction_level').item(0)
df_processed_pl = df_pl.with_columns([
pl.col('income').fill_null(mean_income).alias('income_filled'),
pl.col('age').fill_null(mean_age).alias('age_filled'),
pl.col('satisfaction_level').fill_null(mode_satisfaction).alias('satisfaction_level_filled')
])
print("\n--- 欠損値補完後のデータ(一部) ---")
print(df_processed_pl.select(['income', 'income_filled', 'age', 'age_filled', 'satisfaction_level', 'satisfaction_level_filled']).head())
# 補完後の欠損値の確認
print("\n--- 補完後の欠損値カウント ---")
print(df_processed_pl.select([
pl.col('income_filled').is_null().sum().alias('income_filled_null_count'),
pl.col('age_filled').is_null().sum().alias('age_filled_null_count'),
pl.col('satisfaction_level_filled').is_null().sum().alias('satisfaction_level_filled_null_count')
]))
コード解説:
* df_pl.select(pl.col('column_name').mean()).item()
を使用して、特定の列の平均値を取得します。item()
は単一の値をDataFrameから抽出します。
* group_by().agg(pl.count()).sort().select().item(0)
の組み合わせで最頻値を効率的に取得します。
* with_columns()
メソッドは、新しい列を追加したり、既存の列を変換したりするために使用します。複数の変換を一度に適用できます。
* fill_null(value)
は、列の欠損値(null)を指定された値で埋める関数です。
カテゴリカル変数のエンコーディング
機械学習モデルは数値データを必要とします。ここでは、gender
と product_category
列をOne-Hotエンコーディングし、satisfaction_level_filled
列をLabel Encodingします。
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
import numpy as np
# One-Hot Encoding (gender, product_category)
# Polars DataFrameからNumpy配列に変換
gender_data = df_processed_pl.select('gender').to_numpy()
product_category_data = df_processed_pl.select('product_category').to_numpy()
ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore') # sparse_output=FalseでNumPy配列を生成
# fit_transform は2D配列を期待するため reshape(-1, 1) を適用
gender_encoded = ohe.fit_transform(gender_data.reshape(-1, 1))
product_category_encoded = ohe.fit_transform(product_category_data.reshape(-1, 1))
# エンコードされたデータをPolars DataFrameに結合
# まず、新しいDataFrameを作成
gender_df_pl = pl.DataFrame(
gender_encoded,
schema=[f'gender_{cat}' for cat in ohe.categories_[0]]
)
product_category_df_pl = pl.DataFrame(
product_category_encoded,
schema=[f'product_category_{cat}' for cat in ohe.categories_[0]]
)
# 元のDataFrameに結合
df_processed_pl = pl.concat([
df_processed_pl,
gender_df_pl,
product_category_df_pl
], how='horizontal')
# Label Encoding (satisfaction_level_filled)
satisfaction_data = df_processed_pl.select('satisfaction_level_filled').to_numpy()
le = LabelEncoder()
satisfaction_encoded = le.fit_transform(satisfaction_data.flatten()) # flatten()で1D配列に変換
# 新しい列として追加
df_processed_pl = df_processed_pl.with_columns(
pl.Series(name='satisfaction_level_encoded', values=satisfaction_encoded)
)
print("\n--- エンコーディング後のデータ(一部) ---")
print(df_processed_pl.select([
'gender', 'gender_Male', 'gender_Female', 'gender_Other',
'product_category', 'product_category_Electronics', 'product_category_Books',
'satisfaction_level_filled', 'satisfaction_level_encoded'
]).head())
print(f"LabelEncoderのクラス: {le.classes_}")
コード解説:
* Polars自体にはScikit-learnのような直接的なエンコーダ機能はありませんが、to_numpy()
を使用してデータをNumPy配列に変換し、Scikit-learnのエンコーダを適用できます。
* OneHotEncoder(sparse_output=False)
は、One-Hotエンコーディングされた結果を密なNumPy配列として返します。
* LabelEncoder()
は、カテゴリカルな文字列を0から始まる整数に変換します。
* エンコーディング後のNumPy配列は pl.DataFrame()
でPolars DataFrameに変換し、pl.concat(..., how='horizontal')
で元のDataFrameに横方向に結合します。
* pl.Series(name='...', values=...)
を使用して、NumPy配列から直接新しいPolars Series(列)を作成し、with_columns()
でDataFrameに追加します。
特徴量エンジニアリング
既存の列から新しい特徴量を作成します。ここでは、income
と purchase_count
を基に income_per_purchase
を作成します。
df_processed_pl = df_processed_pl.with_columns(
(pl.col('income_filled') / (pl.col('purchase_count') + 1)).alias('income_per_purchase') # +1はゼロ除算を避けるため
)
print("\n--- 特徴量エンジニアリング後のデータ(一部) ---")
print(df_processed_pl.select(['income_filled', 'purchase_count', 'income_per_purchase']).head())
コード解説:
* with_columns()
を再度使用し、既存の列に対する算術演算の結果を新しい列として追加します。Polarsでは、列は pl.col()
オブジェクトとして扱われ、そのまま演算できます。
4. 動作確認とパフォーマンス比較
PolarsとPandasで同じデータ処理タスクを行い、パフォーマンスを比較します。ここでは、大規模データセットでの集計処理を例とします。
import time
# 大規模データセットの生成 (例: 1000万行)
large_data_size = 10_000_000
large_df_gen = pd.DataFrame({
'group_key': np.random.randint(0, 1000, size=large_data_size),
'value': np.random.rand(large_data_size) * 100
})
large_file_path = 'large_data.csv'
large_df_gen.to_csv(large_file_path, index=False)
print(f"'{large_file_path}' を生成しました。")
# Pandasでの処理
print("\n--- Pandasでの処理 ---")
start_time = time.time()
df_pd_large = pd.read_csv(large_file_path)
grouped_pd = df_pd_large.groupby('group_key')['value'].mean().reset_index()
end_time = time.time()
print(f"Pandasでの処理時間: {end_time - start_time:.4f} 秒")
# print(grouped_pd.head()) # 結果が多いため表示は省略
# Polarsでの処理
print("\n--- Polarsでの処理 ---")
start_time = time.time()
df_pl_large = pl.read_csv(large_file_path)
grouped_pl = df_pl_large.group_by('group_key').agg(
pl.col('value').mean().alias('value_mean')
)
end_time = time.time()
print(f"Polarsでの処理時間: {end_time - start_time:.4f} 秒")
# print(grouped_pl.head()) # 結果が多いため表示は省略
# ファイルのクリーンアップ
os.remove(large_file_path)
os.remove('customer_data.csv')
print("\n生成されたファイルを削除しました。")
コード解説:
* time
モジュールを使用して、処理の前後に時刻を記録し、その差分から処理時間を計測します。
* 同じlarge_data.csv
に対して、PandasとPolarsでそれぞれCSV読み込みとグループ化・平均値計算を実行し、処理時間を比較します。一般的に、Polarsの方が高速であることが確認できます。
応用例・発展的な内容
Polarsはここで紹介した以外にも、多くの高度な機能を備えています。
- Lazy API:
pl.scan_csv()
を使用することで、Lazy評価(処理をすぐに実行せず、必要な時に最適化されたクエリプランで実行)を活用できます。これにより、メモリに乗り切らないような非常に大規模なデータセットでも効率的に処理できます。 - Arrow/Parquetフォーマットの利用: PolarsはApache ArrowやParquetといった列指向の高性能データフォーマットと高い親和性を持っています。これらのフォーマットを利用することで、データの読み書きをさらに高速化できます。
- Pythonエコシステムとの連携: Polars DataFrameは
to_pandas()
メソッドでPandas DataFrameに、to_numpy()
でNumPy配列に簡単に変換できるため、既存のScikit-learnやPyTorchなどのライブラリとスムーズに連携できます。
これらの機能は、より複雑なデータパイプラインや大規模な機械学習プロジェクトにおいて、Polarsの真価を発揮する鍵となります。
トラブルシューティング/FAQ
Q1: PandasコードをPolarsに移行する際の注意点は何ですか?
A1: PolarsはPandasと似たAPIを持っていますが、いくつかの重要な違いがあります。特に、列の選択には pl.col()
を使用する、チェーンメソッドでの操作が推奨される、データ型の扱いがより厳密である、といった点に注意が必要です。df[col]
のような直接的な列アクセスよりも、df.select(pl.col(col))
や df.filter(pl.col(col) > value)
といった形式が推奨されます。
Q2: Polarsでメモリ不足エラーが発生しました。どうすればよいですか?
A2: PolarsはPandasよりもメモリ効率が良いですが、非常に大規模なデータセットを扱う場合はそれでもメモリ不足になることがあります。pl.scan_csv()
や pl.scan_parquet()
といったLazy APIを使用して、データ全体を一度にメモリにロードせず、最適化されたチャンク処理を検討してください。また、不要な中間DataFrameを削除したり、データ型をより小さいものに変更したりすることも有効です。
Q3: Polarsで特定の操作が遅いと感じます。最適化のヒントはありますか?
A3: LazyFrame
を積極的に使用することで、Polarsのクエリオプティマイザが最大限に効果を発揮します。また、apply()
や map_elements()
のようなPythonの関数を適用する操作は、Rustの最適化パスを通らないため、可能な限り避けてください。Polarsネイティブの式 (pl.col().expression()
) を利用することが、パフォーマンス向上の鍵となります。
まとめ
本記事では、高速データ処理ライブラリPolarsについて、その基本的な操作から機械学習の前処理への応用までを実践的なコード例とともに解説しました。Polarsは、その優れたパフォーマンスと効率性により、大規模データセットを扱うAI/MLプロジェクトやWebアプリケーションのバックエンドにおけるデータ処理において、非常に強力なツールとなり得ます。
Pandasからの移行には学習コストが伴いますが、PolarsのLazy APIや最適化された実行エンジンを活用することで、処理速度の向上とメモリ使用量の削減という大きなメリットを享受できます。ぜひ、ご自身のプロジェクトにPolarsを導入し、データ処理の課題解決に役立てていただければ幸いです。