はじめに
SQLAlchemyを使用した既存システムのエンハンスを担当したときに、大量データを取り扱うシーンに直面しました。
数十万件程度のデータを取得するときに処理時間が長くなり、性能の劣化を懸念していましたが、改善に向けて試行錯誤した内容をブログにまとめました。
なお、タイトルの通りSQLAlchemyを使用していますが、結局は生SQLを発行しています。
処理速度を早くする選択肢の一つとしてご参考になればと思っています。
目次
実行環境
- requirements.txt
1 2 |
sqlalchemy==2.0.23 pymysql==1.1.0 |
- DB
下図のような構成で、employeeテーブルに例として100万件データが存在する状態です。
-
Database
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import scoped_session, sessionmaker # 接続先DBの設定 DATABASE = "mysql+pymysql://{username}:{password}@{hostname}/{database}?charset={charset}" # Engine の作成 Engine = create_engine(DATABASE, echo={echo}, pool_recycle={recycle}) Base = declarative_base() # Sessionの作成 session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=Engine, info={"in_transaction": False})) |
- Model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from datetime import datetime from database.database import Base from sqlalchemy import Column from sqlalchemy.dialects.mysql.types import INTEGER, CHAR, VARCHAR, DATETIME class Employee(Base): __tablename__ = "employee" employee_id: Column[int] = Column(INTEGER, primary_key=True, nullable=False, autoincrement=True) section_id: Column[str] = Column(VARCHAR(10), primary_key=True, nullable=False) employee_name: Column[str] = Column(VARCHAR(64), nullable=False) hire_date: Column[datetime] = Column(DATETIME, nullable=False) post: Column[str] = Column(CHAR(2), nullable=False) sex: Column[str] = Column(CHAR(1), nullable=False) birth_day: Column[datetime] = Column(DATETIME, nullable=False) |
通常の取得方法
下記コードのようにsession.query
でデータを取得します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from datetime import datetime from database.database import session from model.employee import Employee # 開始時間を取得 start_datetime = datetime.now() # 100万件取得(all) employee_list: list[Employee] = session.query(Employee).all() # 終了時間を取得 finish_datetime = datetime.now() # 処理時間を出力 print("処理時間: " + str(finish_datetime - start_datetime)) |
処理時間:0:00:33.686847(約33秒)
ちなみに検索条件を指定する場合は下記のようにfilter
を使用します。
1 2 |
# 条件を指定する場合 employee_list: list[Employee] = session.query(Employee).filter(Employee.employee_id == "xxxxx").all() |
取得結果は下記のように参照します。
ORMですのでモデルのプロパティを指定することで値を取得できます。
ループ部分は内包表記で書くこともできます。
1 2 3 |
# 取得結果の参照方法 for employee in employee_list: print(employee.employee_name) |
Core機能を使用した取得方法
下記コードのようにselect
関数を使用してデータを取得します。
SELECT ステートメントの使用
Core機能を使用しているのでORM機能は利用できません。
そのため、取得結果はタプルのリストとなり、モデルのプロパティで取得することはできなくなります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
from datetime import datetime from database.database import session from model.employee import Employee from sqlalchemy import select # 開始時間を取得 start_datetime = datetime.now() # 100万件取得(core) select_statement = select( Employee.employee_id, Employee.section_id, Employee.employee_name, Employee.hire_date, Employee.post, Employee.sex, Employee.birth_day ) employee_list = session.execute(select_statement) # 終了時間を取得 finish_datetime = datetime.now() # 処理時間を出力 print("処理時間: " + str(finish_datetime - start_datetime)) |
処理時間:0:00:22.771209(約22秒)
少し早くなりました!
もしかしたら取得結果とオブジェクトとのマッピングが膨大な件数のため、オーバーヘッドになっていたかもしれませんね。
検索条件を指定する場合は下記のようにwhere
を使用します。
1 2 3 4 5 6 7 8 9 10 |
# 条件を指定する場合 select_statement = select( Employee.employee_id, Employee.section_id, Employee.employee_name, Employee.hire_date, Employee.post, Employee.sex, Employee.birth_day ).where(Employee.employee_id == "xxxxx") |
取得結果は下記のように列番号を指定して参照します。
こちらもループ部分は内包表記で書くことができます。
1 2 3 4 |
# 取得結果の参照方法 for employee in employee_list: # employee_nameを取得する場合 print(employee[2]) |
生SQLを実行する取得方法
SQLAlchemyでは記述したコードからクエリを自動で作成してくれる機能があります。
Core機能を使用した取得方法ではオブジェクトのマッピングを行わないことで処理速度が速くなりました。
ならば、コード上で生のSQLを指定すればクエリ自動作成の処理時間分早くなるのではないか…?ということで検証してみました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from datetime import datetime from database.database import session from sqlalchemy import text # 開始時間を取得 start_datetime = datetime.now() # 100万件取得(raw) query = text("SELECT * FROM employee;") employee_list = session.execute(query) # 終了時間を取得 finish_datetime = datetime.now() # 処理時間を出力 print("処理時間: " + str(finish_datetime - start_datetime)) |
処理時間:0:00:18.334187(約18秒)
わずかに早くなりました!
直面した問題に対してはこの方法で対応することにしました。
検索条件を指定する場合は下記のようにbindparam
を使用します。
1 2 3 |
# 条件を指定する場合 from sqlalchemy import text, bindparam query = text("SELECT * FROM employee WHERE employee_id = :employee_id;").bindparams(bindparam("employee_id", "xxxxx")) |
取得結果はCore機能と同様にタプルのリストとなりますので、列番号を指定して参照します。
こちらもループ部分は内包表記で書くことができます。
1 2 3 4 |
# 取得結果の参照方法 for employee in employee_list: # employee_nameを取得する場合 print(employee[2]) |
まとめ
実施内容と結果は下記の通りです。
実施内容 | 処理時間 |
---|---|
通常の取得方法 | 約33秒 |
Core機能を使用した取得方法 | 約22秒 |
生SQLを実行する取得方法 | 約18秒 |
生のSQLを実行することで処理時間が早くなりました。
SQLAlchemyを使用しておきながらこのような使い方は申し訳なさがありますが…ライブラリや言語を変更できない状況においては選択肢の一つになるかなと思いました。
投稿者プロフィール
-
2021年4月からスカイアーチに中途入社しました。
AWSともっと仲良くなるべく日々勉強中です。
最新の投稿
- AWS2024年1月9日LambdaでX-Rayトレースが出てこない? ~パッシブ計測にお気を付けください~
- MySQL2024年1月9日SQLAlchemyで生SQLを発行して大量データの取得を高速化してみた
- MySQL2024年1月9日SQLAlchemyのINSERTを高速化してみた
- DevOps2023年8月15日SAM CLI で Lambda 関数のリモート呼び出し機能を使ってみた