はじめに
AWS Neptuneを使う機会があり、Neptune作成、LambdaからNeptuneへのアクセスをまとめてみました。
目次
AWS Neptuneとは
AWS Neptuneは、フルマネージドのグラフ構造を備えたデータベースサービスです。
基本要素は以下になります。
- ノード:頂点(Node, Vertex)
- エッジ:辺(Edge)ノードとノードの関係を表す
- プロパティ:ノードとエッジの属性(Key/Value形式で保持)
ちなみにグラフは、ソーシャルネットワーク、路線図、レコメンデーション・・・等、身近なところで使用されています。
サポートされているグラフモデルとクエリ言語
Gremlin、openCypher、SPARQL の3つの言語でクエリが可能です。今回は、Gremlinを使用しました。
Neptuneを準備
Neptuneを作成する前に、あらかじめサブネットを持つVPCを作成しておきます。
マネジメントコンソールのNeptune -> データベース -> データベースを作成を選択して作成します。
画面に沿って必要な項目を設定していきます。
無料利用枠も用意されているので、今回は無料枠を選択。
予め用意したVPCとサブネットを設定します。
Gremrinクエリをするために、Jupyterノートブックも一緒に作成しました。
データベースの作成ボタンをクリックするとNeptuneが作成されます。
今回は、手動でポチポチしながら作成しましたが、CloudFormation テンプレートで環境を作成するNeptuneのハンズオンもあるので、検証でやってみるのもいいですね!
グラフデータベースってどんなもの?Amazon Neptune を使って グラフデータベースのクエリを体験しよう(準備編)
LambdaからNeptuneへアクセス
今回作成した構成は以下になります。
Lambdaでは下記をおこないます。
- Neptuneインスタンスに接続
- Gremrinクエリ生成
- 生成したGremrinクエリを実行
- クエリの実行結果をレスポンスとしてユーザーに返却
1.Neptuneインスタンスに接続
Neptuneへの接続に関するロジック。
こちらを参考にして実装。
AWS Lambda Amazon Neptune 関数の例
署名バージョン 4 署名を使用して Neptune に接続
Neptuneが IAMデータベース認証:ありの場合、バージョン 4 署名を使用して Neptune に接続を行います。
環境変数からAWSの認証情報を取得し、NeptuneデータベースへのリクエストにSigV4認証を追加。
※参考にしたコードでは、データベースURLとリクエストヘッダーのアイテムをList型で返していたけど、Dict型で返さないとエラーとなりました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# IAM認証を使用してNeptuneデータベースへのリクエストを準備する関数 def prepare_iamdb_request(database_url): service = "neptune-db" method = "GET" # Lambd予約済み環境変数から取得 access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] # 認証情報の作成 creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) # AWSRequestオブジェクトを作成し、SigV4Authを使用して認証情報を追加 request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) # リクエストヘッダーを辞書に変換 dict_hedaer = {} for k in request.headers.keys(): v = request.headers.get(k) dict_hedaer[k] = v return (database_url, dict_hedaer) |
接続情報(エンドポイントとポート番号)を取得し、リモート接続を作成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# Neptuneデータベースへの接続情報を取得する関数 def connection_info(): # Neptuneエンドポイントとポートを取得 neptune_endpoint = os.environ["neptuneEndpoint"] neptune_port = os.environ["neptunePort"] # データベースURLを作成 database_url = "wss://{}:{}/gremlin".format(neptune_endpoint, neptune_port) if "USE_IAM" in os.environ and os.environ["USE_IAM"] == "true": # リクエストにSigV4認証を追加 return prepare_iamdb_request(database_url) else: return (database_url, {}) # Neptuneデータベースへのリモート接続を作成する関数 def create_remote_connection(): # 接続情報を取得 (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, "g", pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers ) |
Neptune コネクション作成。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# Neptuneデータベースへの接続を作成し、グラフトラバーサルソースを返す関数 def create_connection(): # リモート接続を作成 conn = create_remote_connection() # グラフオブジェクトの作成 graph = Graph() # Neptuneにリモート接続を設定したグラフトラバーサルソースを作成 g = graph.traversal().withRemote(conn) return g, conn |
Neptuneインスタンスへの接続を管理する部分はデコレータで準備し、Neptuneへの接続とトランザクションを実装しました。
この接続処理を呼び出す関数内で例外が発生した場合、ロールバックされるようにしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# Neptuneデータベースへの接続を管理するデコレータ def with_neptune_connection(func): @wraps(func) def wrapper(*args, **kwargs): # Neptune コネクションの作成 g, conn = create_connection() # トランザクションの作成 tran = g.tx() try: # トランザクションの開始 g_tran = tran.begin() # デコレートされた関数を実行 response = func(g_tran, *args, **kwargs) # コミット tran.commit() except Exception as e: logger.error("error: [{}] {}".format(type(e), str(e))) # ロールバック tran.rollback() finally: # トランザクションを閉じる if tran.isOpen(): tran.close() # コネクションのクローズ conn.close() return response return wrapper |
Neptune接続ロジックは、Layerにして共通化しました。
2.Gremrinクエリ生成
次はノード作成のAPIが呼び出された時のロジック(Gremrinクエリ生成と実行)です。
1 2 3 4 5 6 7 8 9 10 11 12 |
# 新しいノードを作成するためのAPIエンドポイント @app.route("/create-node", methods=["POST"], content_types=["application/json"], cors=True) @with_neptune_connection # Neptuneデータベースへの接続を管理するデコレータ def create_node(g): # リクエストボディから情報を取得 request_body = app.current_request.json_body # ノード作成のgremlinクエリを作成して実行する result = create_node_query_execute(request_body, g) return response(200, json.dumps(result, ensure_ascii=True)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
{ "node_label_name": "test-node", "property_list": [ { "property_name": "name", "property_value": "testのノード" }, { "property_name": "memo", "property_value": "api:create-nodeで作成" } ] } |
3.生成したGremrinクエリを実行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# ノード作成のGremrinクエリを作成して実行 def create_node_query_execute(request_body: dict[str, Any], g: GraphTraversalSource) -> str: # ノード追加 クエリ作成 query = g.addV(request_body["node_label_name"]) # property_listから各プロパティの名前と値を取得して # .property({property_name},{property_value})のクエリ作成 property_list = request_body.get("property_list", []) for property in property_list: query = query.property(property["property_name"], property["property_value"]) # クエリ実行 result = query.next() return "{0}".format(result) |
ここで発行されるGremrinクエリは、下記の様に作成され、result = query.next() でクエリが発行されます。
1 |
g.addV("test-node").property("name","testのノード").property("memo","api:create-nodeで作成").next() |
クエリの実行結果
ノード作成のAPIの実行結果です。
v[xxxxxx-xxxx-xxxx-xxxx-xxxxxxxx]の値は、Neptuneで自動作成されたidが返却されます。
まとめ
AWS NeptuneとLambdaを組み合わせてグラフデータベースを操作して分かったこと
- LambdaはNeptuneと同じVPC内に作成が必要
- IAM DB認証ありの場合、署名バージョン 4 を使用し一時認証情報を取得して接続
- Gremrinクエリの実行は、
.next()
や.iterate()
などの終了ステップで実行される
今回初めてグラフデータベースというものを調査しながら触ってみて、リレーショナルデータベースで複雑なクエリで実現していたデータ間の関係性を取得するようなケースには適していたりと、グラフデータベースとリレーショナルデータベースのそれぞれの良いところを組み合わせて、両方を利用するのがいいと思いました。
投稿者プロフィール
-
2023/1にスカイアーチネットワークスにJoin
AWSを日々勉強中
最新の投稿
- AWS2024年8月20日LocalStackをつかってローカル環境でAWSサービスにアクセスしてみた
- AWS2024年4月4日AWS NeptuneにLambdaからアクセスしてみた
- セキュリティ2024年3月4日Amazon GuardDutyについてまとめてみました
- AWS2023年12月4日Amazon Redshift Streaming Ingestion をやってみた