InfluxDBへデータを登録する

かっこいい名前のInfluxDB、時系列(time series)データベースとして人気のようです。もちろん各種認証でアクセス制限をかけることができますが、HTTP APIを持っており特に作りこまなくてもデータを転送できるという手軽さがあります。そして速い(個人の感想です)。

今回はPython3とPowerShellでデータを投入するためのクライアントを作成してみます。InfluxDBのインストールは別記事で。検証環境はWindows 10 Pro + Python 3.6です。

もくじ:

InfluxDBの基本的な使い方

(フラットな)JSON形式でデータを挿入していくところは他のドキュメント型DBと似ているものの、スキーマには時系列データを扱うためにやや特殊な制約があります。

基本的なデータ構造は下記のようなスキーマ。measurementfieldsは必須です。

よくmeasurementはRDBでいうところのテーブルに相当すると説明されています。tagはインデクスされないフィルタ用のカラム、timeはそのままタイムスタンプですね。データの値そのものをfieldsに指定します。

データの有効期間(やレプリケーション設定)はretention policyと呼ばれる設定により定義されており、初期状態でデフォルトのポリシが構成されています。

データを送信するところまでできればいいので、検索クエリを直接使うケースについて今回は省略します。また、他のDBMSと同様に複数インスタンスによるクラスタリングも可能ですが、今回はシングルノードを想定します。

PythonでのInfluxDBクライアントモジュール

サードパーティで開発されているinfluxdbモジュール(InfluxDB-Python)はpip経由でインストールできます。

手元で試したところバージョンは5.2.0が入りました。

HTTP APIをラップしており、内部的にはrequestsモジュールを使っているようです。
サンプルの全体は後述しますが、クライアント実装用にクラスclientUtility()を定義しました。

influxdbモジュールのクライアント機能はinfluxdb.InfluxDBClient()オブジェクト経由で利用します。

InfluxDBClientオブジェクト

少なくともInfluxDBサーバのアドレス(ホスト名)を指定するのがよくある使い方だと思います。省略すると自ホストに接続しに行きます。

以降はこのオブジェクトのメソッドを呼び出してサーバを操作することになります。

データベースの作成

このあたりも使い方は見た通りですね。ついでにポリシも設定しておきます。
有効期限(duration)はh(時), m(分), d(日), w(週)を使った文字列で指定できます。

データの投入

先に記載した通りのオブジェクトでデータをセットし、InfluxDBClient.write_points()メソッドを呼びます。返り値は送信処理の成否(bool)です。

ただし、データはリストにする必要があります。例えばこんな感じの。

上記のようにタイムスタンプ(UTC)を指定してもいいのですが、省略した場合は受信側のサーバの時刻が使用されます。

情報の表示

サーバに作成されているdatabaseやmeasurement、retention policyを取得するメソッドも用意されています。

取得結果の例はこんな感じ。

Seriesの表示

InfluxDBではtagの値とmeasurementの値の組み合わせで定義されるデータ系列をseriesと呼びます。
例えばtagにクライアントのホスト名をセットしておけば、ホスト名でデータ系列を抽出するという使い方ができます。

seriesを表示させるメソッドは無いようなので、requestメソッドを使ってクエリを投げます。

内部的にはrequestsモジュールを使っているようで、返り値はrequestsオブジェクトになります。

HTTP API

上記までと同じことをHTTP APIで実行すると、おおよそ次のAPIを使うことになるかと思います。
基本的にSQLぽいクエリを作って送信することでサーバ側と通信します。GETの場合はエンコードしてクエリストリングに、POSTの場合はボディにクエリを渡します。

操作URLメソッド送信・受信データ説明
データベース作成/queryPOST"q=CREATE DATABASE {db name}"データベースを作成する
リテンションポリシ設定/queryPOST"q=CREATE RETENTION POLICY {policy name} on {db name} DURATION {span} REPLICATION {num replica} [DEFAULT]"リテンションポリシを設定する
データベース取得/query?q=SHOW+DATABASESGET名前の一覧、JSON形式データベースの一覧を取得する
Measurement取得/query?q=SHOW+MEASUREMENTS+ON+{db_name}GET名前の一覧、JSON形式Measurementの一覧を取得する
リテンションポリシ取得/query?q=SHOW+RETENTION+POLICIES+ON+{db_name}GET名前と各設定値のリスト、JSON形式設定済のリテンションポリシを取得する
Series取得/query?q=SHOW+SERIES+ON+{db_name}GETキーの一覧、JSON形式格納済のSeries一覧を取得する
データ投入/write?db={db name}POSTLine Protocolでエンコードした文字列(UTF-8)サーバにデータを格納する

使い方はサンプルを見ればわかるかと思います。ふつうのHTTPリクエストなので、Pythonならurllib(Python2.x系ではurllib/urllib2)やrequestsモジュール、他のシェルでもcurlを使ってアクセスできます。
デフォルトでは認証がかかっていませんが、もちろん使い方に合わせて認証を設定することもできます。簡単なやり方ではBasic認証になるため、HTTPだと解読の余地があるという点であまりセキュアではありません。

データの投入(write)についてはLine Protocolと呼ばれるエンコーディングが必要になります。

Line Protocol

他のドキュメント型DBではJSONなどになることが多いところ、InfluxDBへデータを転送するには、Line Protocolというエンコードが必要になります。

Line Protocolでは、measurement, tag, fields, timeの順で値を1行に押し込めます。
先頭はmeasurement、続くtagとfieldsはkey=valueの形式とし、カンマで区切ります。

これをPythonで書いた例が下記です。

クライアント側で時刻を設定する場合、timeはUTCで表現したタイムスタンプのナノ秒表示です。
また、write APIを使うときには改行(\n)で区切ることで、複数行のpointを一括で送信することもできます。

サンプル

今回のPython版サンプルスクリプトです。簡単に実行時の引数で処理を分岐させています。

clientUtilityクラスを定義し、各操作をラップしています。利用可能であればinfluxdbモジュールをインポートし、無ければurllibモジュールを経由してHTTP APIを直接呼び出します。Python3.x限定です。

PowerShellでHTTP APIを呼び出す

認証などを考えなければ、他のWebクライアントでもHTTP APIをたたけます。ここでは簡易にLine Protocolを実装したPowerShellの例を書いておきます。

おわり。