この記事は、 AWS Lambda Advent Calendar 2017 - Qiita の14日目分として書かせていただきます。
仮想通貨には複数の取引所があります。それぞれの取引所ごとに売買の値段が変わってくるため、取引所Aの購入価格よりも取引所Bの売却価格が高い(・・・★)ことがあります。その差を使って利益を出すことをアービトラージ(arbitrage)と言います。★のような状態を検知する仕組み(Twitter Bot)を作ってみました。
【高値】 1,895,520円/BTC@zaif 【安値】 1,889,298円/BTC@coincheck 現在、約0.33%の価格差となっております。 — arbitrage4coins (@arbitrage4coins) 2017年12月13日
この仕組は Lambda と Step Functions を用いて、サーバレスに構築しています。この記事では、その構築方法について解説していきたいと思います。
(言い訳)Lambda Function の Python コードについては全くリファクタリングしていない状態です(12/14現在)。こちらについては、こっそり少しずつリファクタリングしていきますので、目をつぶっていただければと思います。(もちろん、こういう書き方をした方がいい!というアドバイスは大歓迎でございます😃)
構築のざっくりとした流れ
- 取引所ごとのビットコイン最終取引価格を取得し、DynamoDB にデータ追加する Lambda Function を作成
- bitFlyer
- Zaif
- Coincheck
- 3つの取引所の価格を比較して差額の割合を計算、1%を超えていたらTwitterに投稿する Lambda Function を作成する
- Step Functions で、最初の3つの Lambda Function を並列で実施、この3つが終わるのを待ち最後の Lambda Function を実行するワークフローを作成
- Cloud Watch で定期的にこのワークフローを実行するようにする
取引所ごとのビットコイン最終取引価格を取得し、DynamoDB にデータ追加する Lambda Function を作成
各取引所のAPIを呼び出し、DynamoDB にその時の価格を入れる処理を書いていきます。外部APIの呼び出し方法については、こちらの記事でまとめておりますのでご参照ください。
bitFlyer
# -*- coding: utf-8 -*- import requests import boto3 import json import decimal import datetime def lambda_handler(event, context): bf_res = requests.get("https://api.bitflyer.jp/v1/ticker?product_code=BTC_JPY") _insert_dynamo(_get_current_datetime_as_YYYYMMDDHHMM(), bf_res.json()["ltp"]) def _get_current_datetime_as_YYYYMMDDHHMM(): now_jst = datetime.datetime.now() + datetime.timedelta(hours=9) return now_jst.strftime('%Y%m%d%H%M') def _insert_dynamo(datetime, price): dynamo = boto3.resource('dynamodb').Table("coin_prices") json_str = ''' { "datetime": "%s", "exchange": "bitflyer", "price": %f } ''' % (datetime, price) item = json.loads(json_str, parse_float=decimal.Decimal) dynamo.put_item(Item = item) return {"status": "OK"}
Zaif
# -*- coding: utf-8 -*- import requests import boto3 import json import decimal import datetime def lambda_handler(event, context): zaif_res = requests.get("https://api.zaif.jp/api/1/ticker/btc_jpy") _insert_dynamo(_get_current_datetime_as_YYYYMMDDHHMM(), zaif_res.json()["last"]) def _get_current_datetime_as_YYYYMMDDHHMM(): now_jst = datetime.datetime.now() + datetime.timedelta(hours=9) return now_jst.strftime('%Y%m%d%H%M') def _insert_dynamo(datetime, price): dynamo = boto3.resource('dynamodb').Table("coin_prices") json_str = ''' { "datetime": "%s", "exchange": "zaif", "price": %f } ''' % (datetime, price) item = json.loads(json_str, parse_float=decimal.Decimal) dynamo.put_item(Item = item) return {"status": "OK"}
Coincheck
# -*- coding: utf-8 -*- import requests import boto3 import json import decimal import datetime def lambda_handler(event, context): cc_res = requests.get("https://coincheck.com/api/ticker") _insert_dynamo(_get_current_datetime_as_YYYYMMDDHHMM(), cc_res.json()["last"]) def _get_current_datetime_as_YYYYMMDDHHMM(): now_jst = datetime.datetime.now() + datetime.timedelta(hours=9) return now_jst.strftime('%Y%m%d%H%M') def _insert_dynamo(datetime, price): dynamo = boto3.resource('dynamodb').Table("coin_prices") json_str = ''' { "datetime": "%s", "exchange": "coincheck", "price": %f } ''' % (datetime, price) item = json.loads(json_str, parse_float=decimal.Decimal) dynamo.put_item(Item = item) return {"status": "OK"}
「1Lambda Function 1責務」としたかったので別の Function にしたのですが、ここまで共通部分があるのだったら分けた意味がない気もしてきました。分けたおかげで Step Functions の並列処理を学ぶことができたのでいいとします。
3つの取引所の価格を比較して差額の割合を計算、1%を超えていたらTwitterに投稿する Lambda Function を作成する
Lambda から Twitter への投稿はこちらにまとめていますので、よろしければこちらもご参照ください。
# -*- coding: utf-8 -*- import boto3 import json import decimal import datetime from boto3.dynamodb.conditions import Key, Attr from requests_oauthlib import OAuth1Session CK = '***' # Consumer Key CS = '***' # Consumer Secret AT = '***' # Access Token AS = '***' # Accesss Token Secert UPDATE_URL = 'https://api.twitter.com/1.1/statuses/update.json' def lambda_handler(event, context): min_coin_price = None max_coin_price = None # 各取引所のBTC現在価格を取得する coin_prices = _get_coin_prices(_get_current_datetime_as_YYYYMMDDHHMM()) # 最低価格、最高価格を決定する for coin_price in coin_prices: if min_coin_price is None: min_coin_price = coin_price max_coin_price = coin_price else: if coin_price['price'] < min_coin_price['price']: min_coin_price = coin_price if max_coin_price['price'] < coin_price['price']: max_coin_price = coin_price delta_rate = (max_coin_price['price'] - min_coin_price['price']) / min_coin_price['price'] * 100 if 1 <= delta_rate: _tweet(min_coin_price, max_coin_price, delta_rate) def _tweet(min_coin_price, max_coin_price, delta_rate): tweet_str = "【高値】 " + "{:,}".format(max_coin_price["price"]) + "円/BTC@" + max_coin_price["exchange"] + "\n"\ + "【安値】 " + "{:,}".format(min_coin_price["price"]) + "円/BTC@" + min_coin_price["exchange"] + "\n"\ + "現在、約" + str("%.2f" % delta_rate) + "%の価格差となっております。" params = {"status": tweet_str } twitter = OAuth1Session(CK, CS, AT, AS) req = twitter.post(UPDATE_URL, params = params) if req.status_code == 200: return tweet_str else: return req.status_code def _get_current_datetime_as_YYYYMMDDHHMM(): now_jst = datetime.datetime.now() + datetime.timedelta(hours=9) return now_jst.strftime('%Y%m%d%H%M') def _get_coin_prices(datetime): dynamo = boto3.resource('dynamodb').Table("coin_prices") return dynamo.query( KeyConditionExpression=Key('datetime').eq(datetime) )['Items']
トークン系は環境変数に持っていった方が良さそうですね。どこかで修正します。ばーっと書いてしまっているので、合わせて補足もしていければと思います。
Step Functions で、最初の3つの Lambda Function を並列で実施、この3つが終わるのを待ち最後の Lambda Function を実行するワークフローを作成
ビジュアルワークフローはこのようなものを作成しました。
続いてJSONコードです。実は Step Functions を触るのが去年のアドベントカレンダー以来だったのですが、当時よりも書きにくさを感じなかったのは気のせいでしょうか。記入時に Lambda Function の arn を保管してくれるのが地味に便利ですね。
{ "Comment": "", "StartAt": "Parallel", "States": { "Parallel": { "Type": "Parallel", "Next": "check_arbitrage", "Branches": [ { "StartAt": "bitflyer_btc", "States": { "bitflyer_btc": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:bitflyer_btc", "End": true } } }, { "StartAt": "zaif_btc", "States": { "zaif_btc": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:zaif_btc", "End": true } } }, { "StartAt": "coincheck_btc", "States": { "coincheck_btc": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:coincheck_btc", "End": true } } } ] }, "check_arbitrage": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:4xxxxxxxxxxx:function:check_arbitrage", "End": true } } }
このように定義しています。check_arbitrage だけは、他の3つの処理が終わってから実施するように定義しています。
Cloud Watch で定期的にこのワークフローを実行するようにする
3分に一度、Step Functions ワークフローを実行するように定義しました。
こんな風にツイートしてくれます。(構築段階なので1%のしきい値を外してます。)
まとめ
- Lambda と Step Functions を用いてちょっとした Fintech な仕組みを作ってみました。
- Twitter bot とサーバレスアーキテクチャはかなり相性がいいと感じています。
- そういえば、料金の試算をしていないので、クラウド破産する前に計算してみます。Step Functon の状態遷移課金と、DynamoDB のデータを一切 purge していないのがネックになりそう。