Python Dask入門 小さなマシンで大きなデータを扱う最初の一歩

Python Dask入門 小さなマシンで大きなデータを扱う最初の一歩

Pandas が登場してから Python はデータ分析の事実上の標準言語になりました。

しかし、データ量が数千万行を超えると途端に RAM が足りない・処理が遅い という現実にぶつかります。

クラスタ環境に逃げる選択肢もありますが、準備・コスト・学習コストを考えると “ちょっと重い” だけのデータに対してはオーバーキルです。

そこで登場するのが Dask

以下 3 点が現場エンジニアにとって大きな魅力です。

  • API 類似度 90 %: 既存の Pandas/NumPy 知識をそのまま使える。
  • 遅延評価 × 並列化: PC のコア数を自動で使い切り、メモリフットプリントを抑制。
  • スケールラインが滑らか: ローカル PC → 分散クラスタへ“ほぼ同じコード”で移行可能。

今回は「大量 CSV を集計する」という 典型的に重くなる処理 を通して、Dask がどのように恩恵をもたらすかを見てみます。

Pandas と Dask の比較

まずは、それぞれの特徴を整理し、違いを明確に把握しましょう。

Pandas Dask データフレーム
主な用途 単一スレッドでの高速なインメモリ処理 並列・分散処理を抽象化し、大規模データに対応
メモリ利用の仕組み データ全体を一度にRAMへ読み込む データをチャンク単位で分割し、断片的にRAMを使用
計算の実行タイミング 即時評価(呼び出しと同時に計算) 遅延タスクグラフを構築し、compute()で実行
スケジューラ OS 標準のスケジューラに依存 スレッド・プロセス・分散クラスタから選択可能

上の表に示したように、Pandas は小~中規模データの分析に向いています。

データをすべて一度にメモリ上に読み込むため(メモリ利用の仕組み)、呼び出しと同時に計算が始まる「即時評価」の性質を持ちます。

そのため、操作後すぐに結果を確認でき、シンプルな API で手軽にデータ操作を進められるのが大きな利点です。

 

一方、Dask データフレーム は大量データや複雑な計算を扱う場面で威力を発揮します。

データを小さなチャンクに分割し断片的にメモリへ読み込むことで、限られた RAM 上でも処理が可能です。

また、遅延タスクグラフを構築し、compute() を呼び出すタイミングで一括実行するため、中間生成物の無駄を省きつつ高速に処理できます。

さらに、実行環境に応じてスレッド・プロセス・分散クラスタを自由に選択できるため、開発段階から本番環境まで同じコードでスケールさせやすい柔軟性が特徴です。

 

Daskのインストール

Dask を使い始めるには、Python 環境とパッケージ管理ツールが整っていれば、たった1行のコマンドでインストールできます。

インストール方法 コマンド
Conda を使う場合 conda install -c conda-forge dask
pip を使う場合 pip install dask

 

サンプルデータ

次のような店舗別の売上データを利用します。

  • 店舗数:200
  • 各店舗のレコード数:10万行
  • 変数:
    • store: 店舗番号
    • product: 商品ID
    • date: 日付
    • sales: 売上金額

全てのデータを合算すると、合計2,000万行のデータです。

店舗別にCSVファイルに格納されており、ファイル名は以下のようにdaily_sales_の後に店舗番号が付いたものになっています。

daily_sales_[店舗番号].csv

 

このようなCSVファイルが200個あるということです。

このデータは以下からダウンロードできます。

CSVファイル一式
https://www.salesanalytics.co.jp/x40x

 

処理時間比較

今から、店舗別の200のCSVファイルを読み込み、全データを結合し、月別に集計していきます。

以下の3パターンで実施し、処理時間を比べます。

  • Pandasで処理
  • Daskで処理
  • Pandas の処理の中で Dask Delayed を使う

 

Dask Delayed を使うと、既存の Pandas コードを大きく書き換えることなく、関数単位で「遅延タスク」に変換して並列実行できるようになります。具体的には、 @dask.delayed デコレータ(もしくは delayed(...))を適用し、処理の呼び出し時には計算を行わずにタスクグラフを構築し、compute()が呼び出されたタイミングで一括実行します。

 

 Pandasで処理

以下、コードです。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import pandas as pd
import time
import glob
# 処理時間の計測開始
start = time.perf_counter()
# CSVファイル一覧を取得
files = glob.glob("data/daily_sales_*.csv")
# for文でファイルを逐次読み込み
dfs = [] # DataFrameを格納するリスト
for f in files:
df = pd.read_csv(f, parse_dates=['date'])
dfs.append(df)
pdf = pd.concat(dfs)
# date列から年月の列を作成
pdf['year_month'] = pdf['date'].dt.strftime('%Y-%m')
# date列をインデックスに設定
pdf = pdf.set_index('date')
# データを店舗別商品別の年月別に集計する処理
monthly = pdf.groupby([
'store',
'product',
'year_month'
])['sales'].sum().reset_index()
# 年月でソート
monthly = monthly.sort_values(['store', 'product', 'year_month'])
# 結果を表示
print(monthly.head())
# 処理時間の計算
pd_time = time.perf_counter() - start
# 処理時間の表示
print(f"Processing Time (seconds) : {pd_time:.2f}s")
import pandas as pd import time import glob # 処理時間の計測開始 start = time.perf_counter() # CSVファイル一覧を取得 files = glob.glob("data/daily_sales_*.csv") # for文でファイルを逐次読み込み dfs = [] # DataFrameを格納するリスト for f in files: df = pd.read_csv(f, parse_dates=['date']) dfs.append(df) pdf = pd.concat(dfs) # date列から年月の列を作成 pdf['year_month'] = pdf['date'].dt.strftime('%Y-%m') # date列をインデックスに設定 pdf = pdf.set_index('date') # データを店舗別商品別の年月別に集計する処理 monthly = pdf.groupby([ 'store', 'product', 'year_month' ])['sales'].sum().reset_index() # 年月でソート monthly = monthly.sort_values(['store', 'product', 'year_month']) # 結果を表示 print(monthly.head()) # 処理時間の計算 pd_time = time.perf_counter() - start # 処理時間の表示 print(f"Processing Time (seconds) : {pd_time:.2f}s")
import pandas as pd
import time
import glob

# 処理時間の計測開始
start = time.perf_counter()  

# CSVファイル一覧を取得
files = glob.glob("data/daily_sales_*.csv")

# for文でファイルを逐次読み込み
dfs = []  # DataFrameを格納するリスト
for f in files:
    df = pd.read_csv(f, parse_dates=['date'])
    dfs.append(df)
pdf = pd.concat(dfs)

# date列から年月の列を作成
pdf['year_month'] = pdf['date'].dt.strftime('%Y-%m')

# date列をインデックスに設定
pdf = pdf.set_index('date')

# データを店舗別商品別の年月別に集計する処理
monthly = pdf.groupby([
    'store', 
    'product', 
    'year_month'
])['sales'].sum().reset_index()

# 年月でソート
monthly = monthly.sort_values(['store', 'product', 'year_month'])

# 結果を表示
print(monthly.head())

# 処理時間の計算
pd_time = time.perf_counter() - start

# 処理時間の表示
print(f"Processing Time (seconds) : {pd_time:.2f}s")

 

このコードの流れです。

  • タイマーをスタートして、処理時間の計測を開始
  • glob.glob() で対象の CSV ファイル一覧を取得
  • for ループ内で pd.read_csv() を使って各 CSV を逐次読み込みし、リストに格納
  • pd.concat() でリスト内のデータフレームを結合
  • date 列から strftime('%Y-%m') により「年-月」を表す year_month 列を作成
  • set_index()date 列をインデックスに設定
  • groupby+sum() で店舗・商品・年月ごとの売上を集計し、reset_index()sort_values() で整形
  • 集計結果の先頭を表示し、タイマーを止めて全体の処理時間を秒単位で出力

 

以下、実行結果です。

   store  product year_month        sales
0      0        0    2023-01  1094.897811
1      0        0    2023-02  1096.106691
2      0        0    2023-03  1277.466556
3      0        0    2023-04  1417.308542
4      0        0    2023-05  1349.660263
Processing Time (seconds) : 148.55s

 

 Daskで処理

以下、コードです。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import dask.dataframe as dd
import time
# 処理時間の計測開始
start = time.perf_counter()
# CSVファイルをDaskデータフレームとして読み込む
pdf = dd.read_csv(
"data/daily_sales_*.csv",
parse_dates=['date'],
)
# 年・月の列を追加
pdf['year_month'] = pdf['date'].dt.to_period('M').dt.to_timestamp()
# 月次集計(年-月ごとに)
dd_monthly = pdf.groupby([
'store',
'product',
'year_month'
])['sales'].sum().to_frame('sales').reset_index()
# compute()メソッドを使用しPandasデータフレームに変換
monthly = dd_monthly.compute()
# 結果を表示
print(monthly.head())
# 処理時間の計算
dd_time = time.perf_counter() - start
# 処理時間の表示
print(f"Processing Time (seconds) : {dd_time:.2f}s")
import dask.dataframe as dd import time # 処理時間の計測開始 start = time.perf_counter() # CSVファイルをDaskデータフレームとして読み込む pdf = dd.read_csv( "data/daily_sales_*.csv", parse_dates=['date'], ) # 年・月の列を追加 pdf['year_month'] = pdf['date'].dt.to_period('M').dt.to_timestamp() # 月次集計(年-月ごとに) dd_monthly = pdf.groupby([ 'store', 'product', 'year_month' ])['sales'].sum().to_frame('sales').reset_index() # compute()メソッドを使用しPandasデータフレームに変換 monthly = dd_monthly.compute() # 結果を表示 print(monthly.head()) # 処理時間の計算 dd_time = time.perf_counter() - start # 処理時間の表示 print(f"Processing Time (seconds) : {dd_time:.2f}s")
import dask.dataframe as dd
import time

# 処理時間の計測開始
start = time.perf_counter()

# CSVファイルをDaskデータフレームとして読み込む
pdf = dd.read_csv(
    "data/daily_sales_*.csv",
    parse_dates=['date'],
)

# 年・月の列を追加
pdf['year_month'] = pdf['date'].dt.to_period('M').dt.to_timestamp()

# 月次集計(年-月ごとに)
dd_monthly = pdf.groupby([
    'store', 
    'product', 
    'year_month'
])['sales'].sum().to_frame('sales').reset_index()

# compute()メソッドを使用しPandasデータフレームに変換
monthly = dd_monthly.compute()

# 結果を表示
print(monthly.head())

# 処理時間の計算
dd_time = time.perf_counter() - start

# 処理時間の表示
print(f"Processing Time (seconds) : {dd_time:.2f}s")

 

このコードの流れです。

  • タイマーをスタートして、処理時間の計測を開始
  • dd.read_csv() で複数の CSV ファイルをまとめて Dask データフレームとして読み込む
  • 読み込んだ日付列から「年月」を表す新しい列 year_month を追加
  • groupbysum() によって店舗・商品・年月ごとの売上合計を計算し、結果を新しい Dask データフレームにまとめる
  • compute() を呼び出して、Dask データフレームを Pandas データフレームに変換
  • 集計結果の先頭を表示し、タイマーを止めて全体の処理時間を秒数で出力

 

以下、実行結果です。

   store  product year_month        sales
0    139        0 2029-05-01  1370.741023
1    139        0 2030-12-01  1223.201196
2    139        1 2026-05-01  1683.776449
3    139        1 2030-01-01   659.329479
4    139        1 2033-09-01  1387.381915
Processing Time (seconds) : 49.23s

 

 Pandas の処理の中で Dask Delayed を使う

以下、コードです。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
import glob
import time
# 処理時間の計測開始
start = time.perf_counter()
# CSVファイル一覧を取得
files = glob.glob("data/daily_sales_*.csv")
# delayed を使用してファイル読み込みを遅延評価化
@delayed
def read_csv(filename):
return pd.read_csv(filename, parse_dates=['date'])
# 各ファイルの読み込みを遅延評価として準備
delayed_dfs = [read_csv(f) for f in files]
# すべてのデータフレームの結合を遅延評価として準備
@delayed
def combine_dfs(dfs):
pdf = pd.concat(dfs)
# date列から年月の列を作成
pdf['year_month'] = pdf['date'].dt.strftime('%Y-%m')
# date列をインデックスに設定
pdf = pdf.set_index('date')
return pdf
# 集計処理を遅延評価として準備
@delayed
def aggregate_data(pdf):
# データを店舗別商品別の年月別に集計する処理
monthly = pdf.groupby([
'store',
'product',
'year_month'
])['sales'].sum().reset_index()
# 年月でソート
monthly = monthly.sort_values(['store', 'product', 'year_month'])
return monthly
# 遅延評価の実行チェーンを構築
pdf = combine_dfs(delayed_dfs)
monthly = aggregate_data(pdf)
# 実際の計算を実行
result = monthly.compute()
# 結果を表示
print(result.head())
# 処理時間の計算
pd_dd_time = time.perf_counter() - start
# 処理時間の表示
print(f"Processing Time (seconds) : {pd_dd_time:.2f}s")
import pandas as pd import dask.dataframe as dd from dask.delayed import delayed import glob import time # 処理時間の計測開始 start = time.perf_counter() # CSVファイル一覧を取得 files = glob.glob("data/daily_sales_*.csv") # delayed を使用してファイル読み込みを遅延評価化 @delayed def read_csv(filename): return pd.read_csv(filename, parse_dates=['date']) # 各ファイルの読み込みを遅延評価として準備 delayed_dfs = [read_csv(f) for f in files] # すべてのデータフレームの結合を遅延評価として準備 @delayed def combine_dfs(dfs): pdf = pd.concat(dfs) # date列から年月の列を作成 pdf['year_month'] = pdf['date'].dt.strftime('%Y-%m') # date列をインデックスに設定 pdf = pdf.set_index('date') return pdf # 集計処理を遅延評価として準備 @delayed def aggregate_data(pdf): # データを店舗別商品別の年月別に集計する処理 monthly = pdf.groupby([ 'store', 'product', 'year_month' ])['sales'].sum().reset_index() # 年月でソート monthly = monthly.sort_values(['store', 'product', 'year_month']) return monthly # 遅延評価の実行チェーンを構築 pdf = combine_dfs(delayed_dfs) monthly = aggregate_data(pdf) # 実際の計算を実行 result = monthly.compute() # 結果を表示 print(result.head()) # 処理時間の計算 pd_dd_time = time.perf_counter() - start # 処理時間の表示 print(f"Processing Time (seconds) : {pd_dd_time:.2f}s")
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
import glob
import time

# 処理時間の計測開始
start = time.perf_counter()  

# CSVファイル一覧を取得
files = glob.glob("data/daily_sales_*.csv")

# delayed を使用してファイル読み込みを遅延評価化
@delayed
def read_csv(filename):
    return pd.read_csv(filename, parse_dates=['date'])

# 各ファイルの読み込みを遅延評価として準備
delayed_dfs = [read_csv(f) for f in files]

# すべてのデータフレームの結合を遅延評価として準備
@delayed
def combine_dfs(dfs):
    pdf = pd.concat(dfs)
    # date列から年月の列を作成
    pdf['year_month'] = pdf['date'].dt.strftime('%Y-%m')
    # date列をインデックスに設定
    pdf = pdf.set_index('date')
    return pdf

# 集計処理を遅延評価として準備
@delayed
def aggregate_data(pdf):
    # データを店舗別商品別の年月別に集計する処理
    monthly = pdf.groupby([
        'store', 
        'product', 
        'year_month'
    ])['sales'].sum().reset_index()
    # 年月でソート
    monthly = monthly.sort_values(['store', 'product', 'year_month'])
    return monthly

# 遅延評価の実行チェーンを構築
pdf = combine_dfs(delayed_dfs)
monthly = aggregate_data(pdf)

# 実際の計算を実行
result = monthly.compute()

# 結果を表示
print(result.head())

# 処理時間の計算
pd_dd_time = time.perf_counter() - start

# 処理時間の表示
print(f"Processing Time (seconds) : {pd_dd_time:.2f}s")

 

このコードの流れです。

  • タイマーをスタートして、処理時間の計測を開始
  • glob.glob() で対象の CSV ファイル一覧を取得
  • @delayed を使って CSV ファイル読み込み関数を遅延評価化
  • リスト内包表記で各ファイルの読み込みタスクを準備
  • combine_dfs 関数を遅延評価化し、全 データフレームの結合、年月の列作成、インデックス設定を準備
  • aggregate_data 関数を遅延評価化し、店舗・商品・年月別の売上集計とソート処理を準備
  • combine_dfs(delayed_dfs)aggregate_data(pdf) の呼び出しでタスクグラフを構築
  • compute() を呼び出して、遅延タスクを並列実行し結果を取得
  • 結果の先頭行を表示し、タイマーを停止、全体の処理時間を秒単位で出力

 

以下、実行結果です。

   store  product year_month        sales
0      0        0    2023-01  1094.897811
1      0        0    2023-02  1096.106691
2      0        0    2023-03  1277.466556
3      0        0    2023-04  1417.308542
4      0        0    2023-05  1349.660263
Processing Time (seconds) : 115.38s

 

 グラフで比較

最後に、以下の3パターンで実施した処理時間をグラフで比べます。

  • Pandasで処理
  • Daskで処理
  • Pandas の処理の中で Dask Delayed を使う

 

以下、コードです。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import matplotlib.pyplot as plt
# データの準備
frameworks = ['Pandas', 'Dask', 'Pandas + Dask Delayed']
times = [pd_time, dd_time, pd_dd_time]
# グラフの作成
plt.figure(figsize=(10, 5))
bars = plt.bar(frameworks, times)
# グラフの装飾
plt.title('Processing Time Comparison: Pandas vs Dask')
plt.ylabel('Processing Time (seconds)')
plt.grid(axis='y', linestyle='--', alpha=0.7)
# 棒の上に数値を表示
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.2f}s',
ha='center', va='bottom')
plt.show()
import matplotlib.pyplot as plt # データの準備 frameworks = ['Pandas', 'Dask', 'Pandas + Dask Delayed'] times = [pd_time, dd_time, pd_dd_time] # グラフの作成 plt.figure(figsize=(10, 5)) bars = plt.bar(frameworks, times) # グラフの装飾 plt.title('Processing Time Comparison: Pandas vs Dask') plt.ylabel('Processing Time (seconds)') plt.grid(axis='y', linestyle='--', alpha=0.7) # 棒の上に数値を表示 for bar in bars: height = bar.get_height() plt.text(bar.get_x() + bar.get_width()/2., height, f'{height:.2f}s', ha='center', va='bottom') plt.show()
import matplotlib.pyplot as plt

# データの準備
frameworks = ['Pandas', 'Dask', 'Pandas + Dask Delayed']
times = [pd_time, dd_time, pd_dd_time]

# グラフの作成
plt.figure(figsize=(10, 5))
bars = plt.bar(frameworks, times)

# グラフの装飾
plt.title('Processing Time Comparison: Pandas vs Dask')
plt.ylabel('Processing Time (seconds)')
plt.grid(axis='y', linestyle='--', alpha=0.7)

# 棒の上に数値を表示
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{height:.2f}s',
             ha='center', va='bottom')
             
plt.show()

 

以下、実行結果です。

 

Dask Delayedとは?

Dask Delayed は、既存の Python 関数やスクリプトをほとんど書き換えずに「遅延評価」し、並列/分散処理できるようにする仕組みです。

ちなみに、遅延評価(Lazy Evaluation)とは、必要になるまで計算や処理を実行せず、「後回し」にしておく仕組みのことです。

“遅延タスク”への変換
普通に関数を呼び出すとすぐに処理が実行されますが、Dask Delayed では関数に @delayed デコレータ(もしくは delayed() 関数呼び出し)を付けることで、その呼び出し自体を「タスク」として記録します。実行はされず、タスクだけが積み上がるイメージです。

タスクグラフの構築
こうして作成された遅延タスクは、依存関係を持つ “ノード” として、Dask の内部にグラフ(DAG: 有向非巡回グラフ)を構築します。例えば「ファイル読み込み → 結合 → 集計」といった複数ステップの処理を、それぞれ独立したタスクとしてグラフ化できます。

一括実行と最適化
最後に .compute() を呼ぶと、Dask がタスクグラフ全体を解析し、最適な順序や並列実行のスケジュールを自動的に決定します。その結果、マルチコア/マルチプロセス、あるいはクラスタ上で並列に処理を行い、高速化やメモリ効率の向上を図れます。

既存コードとの親和性
Pandas や NumPy で書いた関数をそのまま遅延タスク化できるため、コードの大幅な書き換えが不要です。また、Dask データフレーム や Array を使わずに、必要な部分だけ Delayed で置き換えるハイブリッド運用も可能です。

まとめると、Dask Delayed は「関数呼び出しをタスク化し、タスクグラフを遅延構築→最適化→並列実行する」ことで、シンプルなコードを書いたまま大規模データ処理をスケールさせるための強力なツールです。

Dask の主な機能

今回は、すぐ使える機能ということでDask データフレームを中心にお話しをしました。

他にも幾つか便利な機能があります。以下、まとめたものです。

機能 説明
Dask DataFrame Pandas とほぼ互換の API を提供し、行・列をパーティションに分割して並列/分散実行することで、メモリを超えるサイズの表形式データも扱える。
Dask Array NumPy ndarray のサブセット API をブロックアルゴリズムで実装。大規模多次元配列や線形代数演算をコア数・クラスタ規模までスケールさせる。
Dask Bag JSON/ログなど半構造化データを対象に、mapfilterfold など Map-Reduce 風操作を並列化できるコレクション。
分散スケジューラ スレッド・プロセス(単一マシン)から Dask Distributed を用いたクラスタ実行まで、タスクスケジューラを柔軟に切り替えられる。
リアルタイムモニタリング Bokeh 製の Web ダッシュボードが自動生成され、タスクグラフの進行状況や CPU/メモリ使用量をリアルタイムに可視化できる。
Dask-ML Scikit-Learn 互換 API で前処理・学習・推論を並列/分散化し、早期停止付きハイパーパラメータ探索などもサポート。
Futures インターフェース concurrent.futures と類似した文法で Python 関数を Future 化し、クラスタ全体へ非同期スケジュールできる。
エコシステム統合 XGBoost・CuPy・RAPIDS など GPU/分散ライブラリとシームレスに連携し、CPU と GPU をまたいだ高速処理を実現。

 

詳しくは、以下のDaskのドキュメントを見てください。

まとめ

今回は、簡単なデモを通じDaskの高速化の一端を説明しました。

具体的には、Daskのデータフレームをメインに使い、200 ファイル・約2,000 万行の売上 CSV を月次集計するベンチマークを通じて、Pandas では約150 秒かかった処理が Dask データフレーム では遅延評価とタスクグラフにより約50 秒に短縮できることを示した。

Daskは、データフレーム 以外にも追加モジュールが用意されており、ニーズに応じて段階的に活用範囲を広げられるのが Dask の魅力です。