はじめに

こんにちは。
ミルディアに入社して、ちょうど2年が経過したエンジニアの遠藤です。

入社してから、 serverless framework (以下、 serverless)を使った案件にいくつか携わり、知見も溜まってきたので、何回かに分けて、

  • とあるプロジェクトでのシステム構成
  • ハマったことや Tips
  • 便利なプラグイン

などを記事にしていきたいと思います。
今回は実際に、実務でどこまで serverelss で管理しているのかをご紹介します。

本記事は serverless を使った AWS上 のシステム構築が主な内容となります。
serverless の紹介や、いわゆるサーバレスアーキテクチャの解説はしませんので、ご了承ください。

想定している読者のターゲットは、ある程度、CUIを操作できる人を想定しています。
また、紹介しているコマンドは、 Mac OS 上でのコマンドがメインとなりますので、合わせてご了承ください。

まずは、実際のプロジェクトの開発環境でのシステム構成をお見せします。
(AWS CloudFormation Designer から出力した図です。)

細かい文字まではお見せできませんが、何やらゴツい感じのシステムが作れることはお分かりいただけるのではないでしょうか。
この構成は、 serverless だけで実現されていますが、手作業でミスなくセットアップしようと思ったら、1ヶ月以上かかるかもしれません。
確かに、EC2 や RDS が別途動いており、全てを serverless のみで完結しているわけではありませんが、それでも大幅な工数削減ができているように思います。

それでは具体的に、AWSのどんな機能がデプロイされているのか、いくつかに分解してご紹介していきたいと思います。

紹介する構成

  • API Gateway + Lambda + DynamoDB
  • API Gateway + Lambda + SNS
  • SNS + SQS + Lambda
  • CloudWatch Events + Lambda
  • CloudWatch Events + Lambda + AWS Batch
  • その他

API Gateway + Lambda + DynamoDB(and more..)

pattern_a-2

1つ目は、よくある構成だと思いますが、 function(Lambda) のトリガーとして、API Gateway (REST API) を設定するものになります。
function では、DBに接続したり、別のAPIを呼び出したりしています。

この構成の定義ファイルは以下のようなものになります。

#serverless.yml
functions:
  func_a_post:
    handler: api.func_a.post
    events:
      - http:
          path: func_a
          method: post
          cors: true
  func_a_get:
    handler: api.func_a.get
    events:
      - http:
          path: func_a
          method: get
          cors: true

以上の状態でデプロイすれば、自動的に API Gateway と Lambda function が接続された状態になってデプロイされます。
イメージですが参考までに、これを受ける function は弊社では下記のような構成にしています。

# ./api/func_a.py

#
# POST /func_a
#
def post(event, context):
  # do something
  return {
    "statusCode": 200,
    "headers": {
      "Access-Control-Allow-Origin": "*"
    }
  }


#
# GET /func_a
#
def get(event, context):
  # do something
  return {
    "statusCode": 200,
    "headers": {
      "Access-Control-Allow-Origin": "*"
    }
  }

do something の部分で、正当なリクエストかを認証したり、 DynamoDB に繋ぎに行ったり、別の API を呼び出したりといったことをします。

もし実務では使わないとしても、例えばアプリの開発時などに、本番APIが完成するまでのスタブとして使ったりするのもお手軽で良いかと思います。

API Gateway + Lambda + SNS

pattern_b

続いての構成は、入り口は同じですが、処理に時間がかかるため、一旦リクエストの受付だけするようなパターンです。
SNS に通知を publish して、本処理は後述する SNS + SQS + Lambda に引き継がれて、実行されます。

利点としては、 timeout によるエラーを抑制できる点です。
例えば、 API Gateway の場合、タイムアウトは30秒しかありません。(2020/11 現在)
また、リクエスト側がUXを考慮して、タイムアウトさせてしまうケースもあるかもしれません。
一方で、Lambdaであれば、タイムアウトは15分まで伸ばすことができます。
なので、最初から時間がかかることが見込まれていて、かつ非同期で実行しても問題ない場合は、 SNS に通知するという構成を取った方が良いです。
(もちろん、 SNS を通さずにいきなり SQS に投げてしまっても良いのですが、別のシステムが SNS に通知する仕組みを持っていたため、便乗する形で実装しました。)

serverless.yml の書き方は同じなので省略します。

python では、 boto3(AWS SDK for Python)を使って、以下のように SNS に publish しています。

import json
import uuid
import os
from boto3.session import Session


#
# publish
# sns
#
def publish(topic_arn, subject, isOffline=False):

    message = {
        "request": {
            "requestId": str(uuid.uuid4())
        },
        "key1": "value1",
        "key2": "value2"
    }
    endpoint_url = os.getenv('SNS_ENDPOINT')
    # offline(ローカル)実行の場合は、 env に定義した key を使う。
    # AWS 上で実行する場合は、 IAM Role で認証する。
    if isOffline:
        aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
        aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
        region_name = os.getenv("REGION_NAME")
        session = Session(aws_access_key_id=aws_access_key_id,
                          aws_secret_access_key=aws_secret_access_key,
                          region_name=region_name)
    else:
        session = Session()

    client = session.client('sns', endpoint_url=endpoint_url)
    request = {
        'TopicArn': topic_arn,
        'Message': json.dumps(message),
        'Subject': subject
    }
    
    response = client.publish(**request)

    return response

SNS + SQS + Lambda

sample

これは、上述した SNS のイベントを受けて実行する構成です。
定義サンプルは下記の通りです。

#serverless.yml
functions:
  hoge_queue_event:
    handler: worker.hoge_queue_event.handler
    events:
      - sqs: arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:hoge-queue

この時、デプロイ先に SQS が無いとエラーになってしまうので、事前に作成しておくか、 yml の resources として定義しておく必要があります。

#serverless.yml
resources:
 Resources:
   HogeQueue:
     Type: "AWS::SQS::Queue"
     Properties:
       QueueName: "hoge-queue"

これを受ける handler は以下のように書いています。sqs では return で何かを返す必要はありません。

# ./worker/hoge_queue_event.py

# SQS hoge_queue_event
def handler(event, context):
  # do something
  return

このプロジェクトでどんな使い方をしているかというと、

  • ログ集計(DynamoDBの更新)
  • メールの送信
  • ユーザデータをシステム間で同期

というようなことをしています。
15分という制限があるので、あまり重たい作業はできません。

  • 処理に数分かかりそう
  • 失敗時のリトライが必要
  • 同期で実行する必要がない処理

などはこの構成を検討するのも良さそうです。

CloudWatch Events + Lambda

pattern_d

続いては、ユーザ操作などをトリガーとはせず、定期的に何かを実行するケースです。
そんな時に便利なのが、 CloudWatch Events を使った function の定期実行になります。

サンプル定義は以下の通りです。

#serverless.yml
functions:
  schedule_event:
    handler: worker.schedule_event.handler
    timeout: 900
    events:
      - schedule: cron(0/30 * * * ? *)

上記の定義では、30分おきに、何かの処理を実行します。
timeout に 900 と書くことで、15分処理を実行します。
仮に timeout してしまった場合、 Lambda がデフォルトで 2回リトライします。
リトライ不要な場合は maximumRetryAttempts を定義しておくと良さそうです。

紹介しているプロジェクトでは

  • 一定期間のログを集計して、別のシステムへ送信する
  • 仮で生成したゴミレコードを削除する

といった簡易的なバッチっぽい使い方をしています。

CloudWatch Events + Lambda + AWS Batch

pattern_e

15分あれば、そこそこ色んな処理ができますが、それでも足りない場合もあるかと思います。
そんな時は、 AWS Batch を使います。

Batch の定義は(個人的に)かなりめんどくさかったのですが、手作業で実行するよりとても簡単に設定できました。
こちらの環境構築にあたっては、 下記のブログを大いに参考にしています。

Batch をデプロイする事前準備として、 ECR にバッチが起動する際の Docker Image をプッシュしておく必要があります。
AWSコンソールから、ECRに適当な名前のリポジトリを作成します。

その後、以下のコマンドで、 Image をプッシュできます。(AWS CLIが必要)

ECRにログインする

$ $(aws ecr get-login --no-include-email --region ap-northeast-1)

== 2021/09/02 追記 ==

上記のコマンドは、非推奨となりましたので、修正いたします。
https://docs.aws.amazon.com/cli/latest/reference/ecr/get-login.html

Note: This command is deprecated. Use get-login-password instead.

現在は、以下のコマンドを使用します。

ECR_URI=xxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com
$ aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin ${ECR_URI}

== 2021/09/02 追記 ここまで ==

dockerファイルのビルド

$ docker build -t batch-repo .

dockerイメージにタグ付け

$ docker tag batch-repo:latest xxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/batch-repo:latest

ECR にプッシュ

$ docker push xxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/batch-repo:latest

上記まで準備が完了したらは、 servereless に定義を書いていきます。

resources:
  Parameters:
    # subnet と security group は既存のものを参照している
    SubnetIds:
      Type: List<AWS::EC2::Subnet::Id>
      Default: "subnet-xxxxxxxxxxxx, subnet-xxxxxxxxxxxx"
    SecurityGroupIds:
      Type: List<AWS::EC2::SecurityGroup::Id>
      Default: "sg-xxxxxxxxxxxxxxxx"
  Resources:
    BatchServiceRole:
      Type: AWS::IAM::Role
      Properties:
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Principal:
              Service:
              - batch.amazonaws.com
            Action:
            - sts:AssumeRole
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
    # Lambda 自体の role は参照されないので別途必要なポリシーを定義する
    ecsInstanceRole: 
      Type: AWS::IAM::Role
      Properties:
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Principal:
              Service:
              - ec2.amazonaws.com
            Action:
            - sts:AssumeRole
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
        Policies:
          # S3 バケットへのアクセス件を付与する
          - PolicyName: batch-s3-policy
            PolicyDocument:
              Version: '2012-10-17'
              Statement:
                - Effect: Allow
                  Action: s3:*
                  Resource:
                    - Fn::Sub: arn:aws:s3:::hoge_bucket
                    - Fn::Sub: arn:aws:s3:::hoge_bucket/*
          # DynamoDB へのアクセス件を付与する
          - PolicyName: batch-dynamodb-policy
            PolicyDocument:
              Version: '2012-10-17'
              Statement:
                - Effect: Allow
                  Action: dynamodb:*
                  Resource:
                    - arn:aws:dynamodb:ap-northeast-1:*:table/hoge_table
    ecsInstanceProfile:
      Type: AWS::IAM::InstanceProfile
      Properties:
        Roles:
          # 上で作った role を参照する
          - !Ref ecsInstanceRole
    SlsComputeEnv: # コンピューティング環境
      Type: AWS::Batch::ComputeEnvironment
      Properties:
        Type: MANAGED
        ServiceRole: !GetAtt BatchServiceRole.Arn
        ComputeEnvironmentName: your_compute_env_name
        ComputeResources:
          MaxvCpus: 4
          MinvCpus: 0
          SecurityGroupIds: !Ref SecurityGroupIds
          InstanceRole: !GetAtt ecsInstanceProfile.Arn
          Subnets: !Ref SubnetIds
          Type: EC2
          InstanceTypes:
            - optimal
        State: ENABLED
    SlsJobQueue: # ジョブキュー
      Type: AWS::Batch::JobQueue
      Properties:
        JobQueueName: your_job_queue_name
        ComputeEnvironmentOrder:
          - Order: 1
            ComputeEnvironment: !Ref SlsComputeEnv
        State: ENABLED
        Priority: 1
    SlsJobDefinition: # ジョブ定義
      Type: AWS::Batch::JobDefinition
      Properties:
        Type: container
        JobDefinitionName: your_job_definition_name
        ContainerProperties:
          Command:
            - python
            - batch.py
          Memory: 256
          Vcpus: 2
          Image: xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/batch-repo:latest # ecr の URL

上記の定義では、 subnet id、 security group は既存のものを利用する想定で記載していますが、 serverless で定義して利用することも可能です。(参考)

これでデプロイすると、AWS Batch上に、「ジョブ定義」「ジョブキュー」「コンピューティング環境」が自動的に作られます。

以上で AWS Batch の構築は完了です。
あとは、 作成した Job 定義を 定期的に実行してあげれば完成です。

呼び出し方は、 CloudWatch Events + Lambda と同じです。

import boto3


def batch(event, context):
    client = boto3.client('batch')
    your_job_name = "your_job_name"
    your_job_queue_name = "your_job_queue_name"
    your_job_definition_name = "your_job_definition_name"

    result = client.submit_job(
        jobName=your_job_name,
        jobQueue=your_job_queue_name,
        jobDefinition=your_job_definition_name
    )

    return

これで、 Docker 上で、 python batch.py が実行されます。

弊社では使っていませんが、起動時コマンドのパラメータなどを動的に設定することもできます。
https://docs.aws.amazon.com/ja_jp/batch/latest/userguide/job_definition_parameters.html

実務では外部システムとのデータ同期として、夜間 Batch を走らせています。

この構成の注意点として、即時実行されないことがあります。
必要に応じて勝手にインスタンスを作ってくれて便利ではあるのですが、 Job Queue に Job が入ると、まずインスタンスの起動から始めます。(前回のインスタンスが残っていると、それをそのまま使ってくれることもあるみたい。)
結果として、スクリプトが開始されるのは、 トリガーを受けて、 2~3 分後となります。

処理時間が長いため Batch を選択することが多いかと思いますので、 2~3 分を気にするような処理は無いかと思いますが、念の為ご注意ください。
Batch の起動用に、Lambdaを走らせているので、何かちょっとした作業を並行して Lambda で実行する、という構成もアリかもしれないですね。

その他(DynamoDB, S3, Firehose)

基本的に、AWS上のリソースを手作業で構築していくと、設定をミスしたり、作り忘れたりするので serverless で作れるものは全部作ってしまっています。

DynamoDB

resources:
  Resources:
    # ハッシュのみをKeyとするテーブル
    HashTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: HashTableName
        AttributeDefinitions:
          - AttributeName: key
            AttributeType: S
        KeySchema:
          - AttributeName: key
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
    # ハッシュとレンジをKeyとするテーブル
    HashRangeTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: HashRangeTableName
        AttributeDefinitions:
          - AttributeName: hash_key_name
            AttributeType: S
          - AttributeName: range_key_name
            AttributeType: S
          - AttributeName: global_secondary_index_name
            AttributeType: S
        KeySchema:
          - AttributeName: hash_key_name
            KeyType: HASH
          - AttributeName: range_key_name
            KeyType: RANGE
        # グローバルセカンダリインデックスの設定も可能
        # 定義する場合は、 AttributeDefinitions に 項目名を定義しておく必要がある
        GlobalSecondaryIndexes:
          - IndexName: HogeIndex
            KeySchema:
            - AttributeName: global_secondary_index_name
              KeyType: HASH
            Projection:
              ProjectionType: ALL
            ProvisionedThroughput:
              ReadCapacityUnits: 1
              WriteCapacityUnits: 1
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1

S3 bucket

Firehose のデータの投げ先用だったり、静的コンテンツの格納先だったりで使う bucket なんかも定義できます。

resources:
  Resources:
    OneTouchPassIds:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: firehose-stream-bucket-name

Firehose

アクセスログなどを Kinesis Firehose に投げておくと、定期的に S3 に吐き出してくれます。
あとで集計に使ったりしますが、そのリソースも、 serverless で定義して作成することができます。

resources:
  Resources:
    YourStreamFirehose:
      Type: AWS::KinesisFirehose::DeliveryStream
      Properties:
        DeliveryStreamName: firehose-stream-name
        S3DestinationConfiguration:
          BucketARN:
            Fn::Sub: arn:aws:s3:::firehose-stream-bucket-name
          BufferingHints:
            IntervalInSeconds: 300
            SizeInMBs: 5
          CompressionFormat: UNCOMPRESSED
          RoleARN:
            Fn::GetAtt: [FirehoseRole, Arn]

実行する

これで、あとは

$ sls deploy

と実行すれば、環境構築も、機能追加のデプロイなども完了します。

serverless で何ができるか、ということは何となく知っていても、実務ではどれくらい利用されているのか?というのを知る機会はあまりないかと思います。
この記事が何かの参考になって、次の案件は serverless 使ってみようかな、となっていただけたら幸いです。

まとめ

今回紹介した内容がそれぞれ、どんな用途に向いているか、ざっくりとまとめておきます(主観も入っておりますがご了承ください。。。)

構成 実行時間 定期実行 環境構築
API Gateway + Lambda + DynamoDB 30秒以内
API Gateway + Lambda + SNS 15分以内※
(30秒以内のレスポンス)
普通
SNS + SQS + Lambda 15分以内 普通
CloudWatch Events + Lambda 15分以内
CloudWatch Events + Lambda + AWS Batch 15分以上

※ 本処理はSNS + SQS + Lambdaに引き継いで、非同期で実行可能

他にもセキュリティ面だったり、性能や費用面で考慮に入れると変わってくることもあります。

また、そもそも serverless(Lamda) なのか、AWSなのか、クラウドなのか、既存のサービスを使っても良いのでは?
と、色々な観点から考えていくことが大事ですので、あくまでも、参考程度に見ていただければと思います。

次回は、 serverless を使ってハマったことなどをご紹介していきたいと思います。