AWS Lambda と Step Functions で作るサーバレスな仮想通貨アービトラージ検知システム

 この記事は、 AWS Lambda Advent Calendar 2017 - Qiita の14日目分として書かせていただきます。

 仮想通貨には複数の取引所があります。それぞれの取引所ごとに売買の値段が変わってくるため、取引所Aの購入価格よりも取引所Bの売却価格が高い(・・・★)ことがあります。その差を使って利益を出すことをアービトラージ(arbitrage)と言います。★のような状態を検知する仕組み(Twitter Bot)を作ってみました。

twitter.com

この仕組は Lambda と Step Functions を用いて、サーバレスに構築しています。この記事では、その構築方法について解説していきたいと思います。

(言い訳)Lambda Function の Python コードについては全くリファクタリングしていない状態です(12/14現在)。こちらについては、こっそり少しずつリファクタリングしていきますので、目をつぶっていただければと思います。(もちろん、こういう書き方をした方がいい!というアドバイスは大歓迎でございます😃)

構築のざっくりとした流れ

  • 取引所ごとのビットコイン最終取引価格を取得し、DynamoDB にデータ追加する Lambda Function を作成
    • bitFlyer
    • Zaif
    • Coincheck
  • 3つの取引所の価格を比較して差額の割合を計算、1%を超えていたらTwitterに投稿する Lambda Function を作成する
  • Step Functions で、最初の3つの Lambda Function を並列で実施、この3つが終わるのを待ち最後の Lambda Function を実行するワークフローを作成
  • Cloud Watch で定期的にこのワークフローを実行するようにする

取引所ごとのビットコイン最終取引価格を取得し、DynamoDB にデータ追加する Lambda Function を作成

 各取引所のAPIを呼び出し、DynamoDB にその時の価格を入れる処理を書いていきます。外部APIの呼び出し方法については、こちらの記事でまとめておりますのでご参照ください。

www.ketancho.net

bitFlyer

# -*- coding: utf-8 -*-

import requests
import boto3
import json
import decimal
import datetime

def lambda_handler(event, context):
    bf_res = requests.get("https://api.bitflyer.jp/v1/ticker?product_code=BTC_JPY")
    _insert_dynamo(_get_current_datetime_as_YYYYMMDDHHMM(), bf_res.json()["ltp"])

def _get_current_datetime_as_YYYYMMDDHHMM():
    now_jst = datetime.datetime.now() + datetime.timedelta(hours=9)
    return now_jst.strftime('%Y%m%d%H%M')

def _insert_dynamo(datetime, price):
    dynamo = boto3.resource('dynamodb').Table("coin_prices")

    json_str = '''
    {
        "datetime": "%s",
        "exchange": "bitflyer",
        "price": %f
    }
    ''' % (datetime, price)

    item = json.loads(json_str, parse_float=decimal.Decimal)
    dynamo.put_item(Item = item)

    return {"status": "OK"}

Zaif

# -*- coding: utf-8 -*-

import requests
import boto3
import json
import decimal
import datetime

def lambda_handler(event, context):
    zaif_res = requests.get("https://api.zaif.jp/api/1/ticker/btc_jpy")
    _insert_dynamo(_get_current_datetime_as_YYYYMMDDHHMM(), zaif_res.json()["last"])

def _get_current_datetime_as_YYYYMMDDHHMM():
    now_jst = datetime.datetime.now() + datetime.timedelta(hours=9)
    return now_jst.strftime('%Y%m%d%H%M')

def _insert_dynamo(datetime, price):
    dynamo = boto3.resource('dynamodb').Table("coin_prices")

    json_str = '''
    {
        "datetime": "%s",
        "exchange": "zaif",
        "price": %f
    }
    ''' % (datetime, price)

    item = json.loads(json_str, parse_float=decimal.Decimal)
    dynamo.put_item(Item = item)

    return {"status": "OK"}

Coincheck

# -*- coding: utf-8 -*-

import requests
import boto3
import json
import decimal
import datetime

def lambda_handler(event, context):
    cc_res = requests.get("https://coincheck.com/api/ticker")
    _insert_dynamo(_get_current_datetime_as_YYYYMMDDHHMM(), cc_res.json()["last"])

def _get_current_datetime_as_YYYYMMDDHHMM():
    now_jst = datetime.datetime.now() + datetime.timedelta(hours=9)
    return now_jst.strftime('%Y%m%d%H%M')

def _insert_dynamo(datetime, price):
    dynamo = boto3.resource('dynamodb').Table("coin_prices")

    json_str = '''
    {
        "datetime": "%s",
        "exchange": "coincheck",
        "price": %f
    }
    ''' % (datetime, price)

    item = json.loads(json_str, parse_float=decimal.Decimal)
    dynamo.put_item(Item = item)

    return {"status": "OK"}

「1Lambda Function 1責務」としたかったので別の Function にしたのですが、ここまで共通部分があるのだったら分けた意味がない気もしてきました。分けたおかげで Step Functions の並列処理を学ぶことができたのでいいとします。

3つの取引所の価格を比較して差額の割合を計算、1%を超えていたらTwitterに投稿する Lambda Function を作成する

 Lambda から Twitter への投稿はこちらにまとめていますので、よろしければこちらもご参照ください。

www.ketancho.net

# -*- coding: utf-8 -*-

import boto3
import json
import decimal
import datetime
from boto3.dynamodb.conditions import Key, Attr
from requests_oauthlib import OAuth1Session

CK = '***' # Consumer Key
CS = '***' # Consumer Secret
AT = '***' # Access Token
AS = '***' # Accesss Token Secert

UPDATE_URL = 'https://api.twitter.com/1.1/statuses/update.json'

def lambda_handler(event, context):
    min_coin_price = None
    max_coin_price = None

    # 各取引所のBTC現在価格を取得する
    coin_prices = _get_coin_prices(_get_current_datetime_as_YYYYMMDDHHMM())

    # 最低価格、最高価格を決定する
    for coin_price in coin_prices:
        if min_coin_price is None:
            min_coin_price = coin_price
            max_coin_price = coin_price
        else:
            if coin_price['price'] < min_coin_price['price']:
                min_coin_price = coin_price
            if max_coin_price['price'] < coin_price['price']:
                max_coin_price = coin_price

    delta_rate = (max_coin_price['price'] - min_coin_price['price']) / min_coin_price['price'] * 100
    if 1 <= delta_rate:
        _tweet(min_coin_price, max_coin_price, delta_rate)

def _tweet(min_coin_price, max_coin_price, delta_rate):
    tweet_str = "【高値】 " + "{:,}".format(max_coin_price["price"]) + "円/BTC@" + max_coin_price["exchange"] + "\n"\
        + "【安値】 " + "{:,}".format(min_coin_price["price"]) + "円/BTC@" + min_coin_price["exchange"] + "\n"\
        + "現在、約" + str("%.2f" % delta_rate) + "%の価格差となっております。"
    params = {"status": tweet_str }
    twitter = OAuth1Session(CK, CS, AT, AS)
    req = twitter.post(UPDATE_URL, params = params)
    if req.status_code == 200:
        return tweet_str
    else:
        return req.status_code

def _get_current_datetime_as_YYYYMMDDHHMM():
    now_jst = datetime.datetime.now() + datetime.timedelta(hours=9)
    return now_jst.strftime('%Y%m%d%H%M')

def _get_coin_prices(datetime):
    dynamo = boto3.resource('dynamodb').Table("coin_prices")
    return dynamo.query(
        KeyConditionExpression=Key('datetime').eq(datetime)
    )['Items']

トークン系は環境変数に持っていった方が良さそうですね。どこかで修正します。ばーっと書いてしまっているので、合わせて補足もしていければと思います。

Step Functions で、最初の3つの Lambda Function を並列で実施、この3つが終わるのを待ち最後の Lambda Function を実行するワークフローを作成

 ビジュアルワークフローはこのようなものを作成しました。

f:id:ketancho_jp:20171214050748p:plain

続いてJSONコードです。実は Step Functions を触るのが去年のアドベントカレンダー以来だったのですが、当時よりも書きにくさを感じなかったのは気のせいでしょうか。記入時に Lambda Function の arn を保管してくれるのが地味に便利ですね。

{
  "Comment": "",
  "StartAt": "Parallel",
  "States": {
    "Parallel": {
      "Type": "Parallel",
      "Next": "check_arbitrage",
      "Branches": [
        {
          "StartAt": "bitflyer_btc",
          "States": {
            "bitflyer_btc": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:bitflyer_btc",
              "End": true
            }
          }
        },
        {
          "StartAt": "zaif_btc",
          "States": {
            "zaif_btc": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:zaif_btc",
              "End": true
            }
          }
        },
        {
          "StartAt": "coincheck_btc",
          "States": {
            "coincheck_btc": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:coincheck_btc",
              "End": true
            }
          }
        }
      ]
    },
    "check_arbitrage": {
        "Type": "Task",
        "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:check_arbitrage",
        "End": true
    }
  }
}

このように定義しています。check_arbitrage だけは、他の3つの処理が終わってから実施するように定義しています。

Cloud Watch で定期的にこのワークフローを実行するようにする

 3分に一度、Step Functions ワークフローを実行するように定義しました。

f:id:ketancho_jp:20171214051203p:plain

f:id:ketancho_jp:20171214051211p:plain

こんな風にツイートしてくれます。(構築段階なので1%のしきい値を外してます。)

まとめ

  • Lambda と Step Functions を用いてちょっとした Fintech な仕組みを作ってみました。
  • Twitter bot とサーバレスアーキテクチャはかなり相性がいいと感じています。
  • そういえば、料金の試算をしていないので、クラウド破産する前に計算してみます。Step Functon の状態遷移課金と、DynamoDB のデータを一切 purge していないのがネックになりそう。

今後

  • BTC 以外の仮想通貨にも対応したい。
  • ビビリなのでアービトラージすることはない気がする。

www.ketancho.net