かっこいい名前のInfluxDB、時系列(time series)データベースとして人気のようです。もちろん各種認証でアクセス制限をかけることができますが、HTTP APIを持っており特に作りこまなくてもデータを転送できるという手軽さがあります。そして速い(個人の感想です)。
今回はPython3とPowerShellでデータを投入するためのクライアントを作成してみます。InfluxDBのインストールは別記事で。検証環境はWindows 10 Pro + Python 3.6です。
もくじ:
InfluxDBの基本的な使い方
(フラットな)JSON形式でデータを挿入していくところは他のドキュメント型DBと似ているものの、スキーマには時系列データを扱うためにやや特殊な制約があります。
基本的なデータ構造は下記のようなスキーマ。measurement
とfields
は必須です。
1 2 3 4 5 | point = {"measurement": "...", "tags": {"tag1": "..."}, "fields": {"metric1": "xxx", "metric2": "yyy", ...}, "time": "..."} |
よくmeasurementはRDBでいうところのテーブルに相当すると説明されています。tagはインデクスされないフィルタ用のカラム、timeはそのままタイムスタンプですね。データの値そのものをfieldsに指定します。
データの有効期間(やレプリケーション設定)はretention policyと呼ばれる設定により定義されており、初期状態でデフォルトのポリシが構成されています。
データを送信するところまでできればいいので、検索クエリを直接使うケースについて今回は省略します。また、他のDBMSと同様に複数インスタンスによるクラスタリングも可能ですが、今回はシングルノードを想定します。
PythonでのInfluxDBクライアントモジュール
サードパーティで開発されているinfluxdbモジュール(InfluxDB-Python)はpip経由でインストールできます。
1 | > pip install influxdb |
手元で試したところバージョンは5.2.0が入りました。
HTTP APIをラップしており、内部的にはrequestsモジュールを使っているようです。
サンプルの全体は後述しますが、クライアント実装用にクラスclientUtility()
を定義しました。
influxdbモジュールのクライアント機能はinfluxdb.InfluxDBClient()
オブジェクト経由で利用します。
InfluxDBClientオブジェクト
少なくともInfluxDBサーバのアドレス(ホスト名)を指定するのがよくある使い方だと思います。省略すると自ホストに接続しに行きます。
1 2 3 4 | from influxdb import InfluxDBClient self.client = InfluxDBClient(host=remote_host, port=remote_port, database=db_name) |
以降はこのオブジェクトのメソッドを呼び出してサーバを操作することになります。
データベースの作成
このあたりも使い方は見た通りですね。ついでにポリシも設定しておきます。
有効期限(duration)はh(時), m(分), d(日), w(週)を使った文字列で指定できます。
1 2 3 4 5 | self.client.create_database(self.db_name) self.client.create_retention_policy("mypolicy", "100w", 1, default=True) |
データの投入
先に記載した通りのオブジェクトでデータをセットし、InfluxDBClient.write_points()
メソッドを呼びます。返り値は送信処理の成否(bool)です。
1 | res = self.client.write_points(data_) |
ただし、データはリストにする必要があります。例えばこんな感じの。
1 2 3 4 5 6 7 8 9 10 11 12 13 | import datetime data_ = [{'fields': {'metric1': 1.0, 'metric2': -1}, 'measurement': 'testms', 'time': datetime.datetime.utcnow(), 'tags': {'cat1': 'aaa'}}, {'fields': {'metric1': 2.0, 'metric2': -2}, 'measurement': 'testms', 'time': datetime.datetime.utcnow(), 'tags': {'cat1': 'aaa'}}, {'fields': {'metric1': 3.0, 'metric2': -3}, 'measurement': 'testms', 'time': datetime.datetime.utcnow(), 'tags': {'cat1': 'aaa'}}] |
上記のようにタイムスタンプ(UTC)を指定してもいいのですが、省略した場合は受信側のサーバの時刻が使用されます。
情報の表示
サーバに作成されているdatabaseやmeasurement、retention policyを取得するメソッドも用意されています。
1 2 3 | res["database"] = self.client.get_list_database() res["measurement"] = self.client.get_list_measurements() res["retention"] = self.client.get_list_retention_policies() |
取得結果の例はこんな感じ。
1 2 3 4 5 6 7 8 9 10 11 12 13 | {'database': [{'name': '_internal'}, {'name': 'testdb1'}, {'name': 'testdb2'}], 'measurement': [{'name': 'testms1'}, {'name': 'testms2'}], 'retention': [{'default': False, 'duration': '0s', 'name': 'autogen', 'replicaN': 1, 'shardGroupDuration': '168h0m0s'}, {'default': True, 'duration': '16800h0m0s', 'name': 'mypolicy', 'replicaN': 1, 'shardGroupDuration': '168h0m0s'}],... } |
Seriesの表示
InfluxDBではtagの値とmeasurementの値の組み合わせで定義されるデータ系列をseriesと呼びます。
例えばtagにクライアントのホスト名をセットしておけば、ホスト名でデータ系列を抽出するという使い方ができます。
seriesを表示させるメソッドは無いようなので、requestメソッドを使ってクエリを投げます。
1 2 | query_ = "SHOW SERIES ON " + self.db_name buf_ = self.client.request("query", params={"q": query_}) |
内部的にはrequestsモジュールを使っているようで、返り値はrequestsオブジェクトになります。
HTTP API
上記までと同じことをHTTP APIで実行すると、おおよそ次のAPIを使うことになるかと思います。
基本的にSQLぽいクエリを作って送信することでサーバ側と通信します。GETの場合はエンコードしてクエリストリングに、POSTの場合はボディにクエリを渡します。
操作 | URL | メソッド | 送信・受信データ | 説明 |
---|---|---|---|---|
データベース作成 | /query | POST | "q=CREATE DATABASE {db name}" | データベースを作成する |
リテンションポリシ設定 | /query | POST | "q=CREATE RETENTION POLICY {policy name} on {db name} DURATION {span} REPLICATION {num replica} [DEFAULT]" | リテンションポリシを設定する |
データベース取得 | /query?q=SHOW+DATABASES | GET | 名前の一覧、JSON形式 | データベースの一覧を取得する |
Measurement取得 | /query?q=SHOW+MEASUREMENTS+ON+{db_name} | GET | 名前の一覧、JSON形式 | Measurementの一覧を取得する |
リテンションポリシ取得 | /query?q=SHOW+RETENTION+POLICIES+ON+{db_name} | GET | 名前と各設定値のリスト、JSON形式 | 設定済のリテンションポリシを取得する |
Series取得 | /query?q=SHOW+SERIES+ON+{db_name} | GET | キーの一覧、JSON形式 | 格納済のSeries一覧を取得する |
データ投入 | /write?db={db name} | POST | Line Protocolでエンコードした文字列(UTF-8) | サーバにデータを格納する |
使い方はサンプルを見ればわかるかと思います。ふつうのHTTPリクエストなので、Pythonならurllib(Python2.x系ではurllib/urllib2)やrequestsモジュール、他のシェルでもcurlを使ってアクセスできます。
デフォルトでは認証がかかっていませんが、もちろん使い方に合わせて認証を設定することもできます。簡単なやり方ではBasic認証になるため、HTTPだと解読の余地があるという点であまりセキュアではありません。
データの投入(write
)についてはLine Protocolと呼ばれるエンコーディングが必要になります。
Line Protocol
他のドキュメント型DBではJSONなどになることが多いところ、InfluxDBへデータを転送するには、Line Protocolというエンコードが必要になります。
Line Protocolでは、measurement, tag, fields, timeの順で値を1行に押し込めます。
先頭はmeasurement、続くtagとfieldsはkey=value
の形式とし、カンマで区切ります。
これをPythonで書いた例が下記です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | # pointの例 sample_data = {"measurement": "testms", "tags": {"host": "localhost1"}, "fields": {"met1": 1.0, "met2": 2.0}, "time": datetime.datetime.now()} def __encode(self, data_): # encode object to string with line protocol # escape special characters def _esc(raw_): return raw_.replace("\\", "\\\\").replace( " ", "\\ ").replace(",", "\\,").replace("=", "\\=") # set measurement on the top body = data_["measurement"] if "tags" in data_.keys(): tags_ = ",".join(["{}={}".format(_esc(x), _esc(data_["tags"][x])) for x in data_["tags"].keys()]) if tags_ != "": body += "," + tags_ # assuming all value are float number body += " " + ",".join(["{}={}".format(x, data_["fields"][x]) for x in data_["fields"].keys()]) # encode additionally if data has timestamp if "time" in data_.keys(): buf_ = data_["time"].timestamp() body += " {:.9f}".format(buf_).replace(".", "") return body |
クライアント側で時刻を設定する場合、timeはUTCで表現したタイムスタンプのナノ秒表示です。
また、write
APIを使うときには改行(\n
)で区切ることで、複数行のpointを一括で送信することもできます。
1 2 3 4 5 6 7 | payload_ = "" for item in data_: # apply line protocol payload_ += self.__encode(item) + "\n" # send post request res = self.__http_request(self.base_url + "write", data_=payload_, query_={"db": self.db_name}) |
サンプル
今回のPython版サンプルスクリプトです。簡単に実行時の引数で処理を分岐させています。
1 2 3 4 5 6 | # create database > python sample.py make # push sample data > python sample.py send # show stats > python sample.py info |
clientUtility
クラスを定義し、各操作をラップしています。利用可能であればinfluxdb
モジュールをインポートし、無ければurllib
モジュールを経由してHTTP APIを直接呼び出します。Python3.x限定です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | # -*- coding: utf-8 -*- import sys import json from pprint import pprint from datetime import datetime from traceback import print_exc try: from influxdb import InfluxDBClient use_influxc = True except ImportError: import urllib.request import urllib.parse use_influxc = False except BaseException: print_exc() class clientUtility: def __init__(self, db_name, remote_host, remote_port): self.db_name = db_name self.base_url = "http://{}:{}/".format(remote_host, remote_port) if use_influxc: self.client = InfluxDBClient(host=remote_host, port=remote_port, database=db_name) def __encode(self, data_): # encode object to string with line protocol # escape special characters def _esc(raw_): return raw_.replace("\\", "\\\\").replace( " ", "\\ ").replace(",", "\\,").replace("=", "\\=") # set measurement on the top body = data_["measurement"] if "tags" in data_.keys(): tags_ = ",".join(["{}={}".format(_esc(x), _esc(data_["tags"][x])) for x in data_["tags"].keys()]) if tags_ != "": body += "," + tags_ # assuming all value are float number body += " " + ",".join(["{}={}".format(x, data_["fields"][x]) for x in data_["fields"].keys()]) # encode additionally if data has timestamp if "time" in data_.keys(): buf_ = data_["time"].timestamp() body += " {:.9f}".format(buf_).replace(".", "") return body def __http_request(self, url_, data_=None, query_={}): # build url url = url_ if query_ != {}: url += "?{}".format(urllib.parse.urlencode(query_)) if data_ is not None: # post request req = urllib.request.Request(url, data=data_.encode('utf-8'), method="POST") res = urllib.request.urlopen(req) return res.getcode() in [200, 204] else: # get request res = urllib.request.urlopen(url) return json.loads(res.read()) def send(self, data_): if not isinstance(data_, list): raise Exception( "data must must be list (passed:{})".format(type(data_))) if use_influxc: # write_points api returns bool res = self.client.write_points(data_) else: payload_ = "" for item in data_: # apply line protocol payload_ += self.__encode(item) + "\n" # send post request res = self.__http_request(self.base_url + "write", data_=payload_, query_={"db": self.db_name}) # verify result if not res: raise Exception("failed to send data") def create_database(self): if use_influxc: self.client.create_database(self.db_name) self.client.create_retention_policy("mypolicy", "100w", 1, default=True) else: url_ = self.base_url + "query" # send to create database request data_ = "q=CREATE DATABASE {}".format(self.db_name) res = self.__http_request(url_, data_=data_) # check response if not res: raise Exception("failed to create database") # send to add retention policy buf_ = "q=CREATE RETENTION POLICY mypolicy on " + self.db_name buf_ += " DURATION 100w REPLICATION 1 DEFAULT" res = self.__http_request(url_, data_=buf_) # check response if not res: raise Exception("failed to add retention policy") def get_info(self): res = {} if use_influxc: res["database"] = self.client.get_list_database() res["measurement"] = self.client.get_list_measurements() res["retention"] = self.client.get_list_retention_policies() query_ = "SHOW SERIES ON " + self.db_name buf_ = self.client.request("query", params={"q": query_}) try: res["series"] = buf_.json()["results"][0]["series"] except BaseException: res["series"] = {} else: url_ = self.base_url + "query" q = {"database": "SHOW DATABASES", "measurement": "SHOW MEASUREMENTS ON " + self.db_name, "retention": "SHOW RETENTION POLICIES ON " + self.db_name, "series": "SHOW SERIES ON " + self.db_name} for key_ in q.keys(): buf_ = self.__http_request(url_, query_={"q": q[key_]}) try: res[key_] = buf_["results"][0]["series"] except BaseException: print_exc() res[key_] = {} pprint(res) def sample_data(): import random res = {"measurement": "testms", "tags": {"host": "localhost1"}, "fields": {"sample1": random.random(), "sample2": random.random()}} return res if __name__ == "__main__": try: _client = clientUtility(**{"db_name": "testdb", "remote_host": "myserver", "remote_port": 8086}) if sys.argv[1] == "make": _client.create_database() elif sys.argv[1] == "info": _client.get_info() elif sys.argv[1] == "send": _client.send([sample_data() for i in range(5)]) else: raise Exception("unsupported argument") except BaseException: print_exc() |
PowerShellでHTTP APIを呼び出す
認証などを考えなければ、他のWebクライアントでもHTTP APIをたたけます。ここでは簡易にLine Protocolを実装したPowerShellの例を書いておきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | function writePoint ($content,$config) { # implements line protocol simply $payload = $content.measurement; if ($content.Contains("tags")) { $payload += "," + (($content.tags.Keys | ForEach-Object { "{0}={1}" -f $_,$content.tags[$_] }) -join ","); } $payload += " " + (($content.fields.Keys | ForEach-Object { "{0}={1}" -f $_,$content.fields[$_].ToString() }) -join ","); # send to influxdb server $url = "http://{0}:{1}/write?db={2}" -f $config.remote_host,$config.remote_port,$config.db_name; Write-Host "sending post request..." $url $res = Invoke-WebRequest -Method Post -Uri $url -Body $payload; } $config = @{ "remote_host"="myserver"; "remote_port"="8086"; "db_name"="mydb" } $points = @{ "measurement" = "testms"; "fields" = @{"metric" = 1.0}; "tags" = @{"cat1" = "aaa"} } # write points with HTTP API writePoint $points $config |
おわり。