入力データストリームからやってくるデータを各出力ストリームに応じたクエリを定義することができ、出力するデータを処理することができます。
現時点では扱えるデータリームは以下のとおりになります。
- 入力
- データストリーム
- Event Hubs
- BLOB Storage
- 参照データ
- BLOB Storage
- 出力
- SQL Database
- BLOB Storage
- Event Hubs
- Power BI
- Table Storage
- Service Bus Queue
- Service Bus Topic
- クエリ1 : 入力ストリームをそのまま「SQL Database」テーブルに格納する。
- クエリ2 : 入力ストリームをそのまま「Power BI」に出力する。
- クエリ3 : 入力ストリームを60秒おきに集計し「Power BI」に出力する。
クエリはデータの条件及び集計を定義でき、不要データを省いたり集計結果を別の出力に向けることができます。詳しいクエリの構文はここへ。
■bci-test-streamanalytics-01のクエリ
/* クエリ1
SELECT
*
INTO
[sa2db]
FROM
[eh2sa]
*/
■bci-test-streamanalytics-02のクエリ
/* クエリ2
* デバイスからの生データをPower BI で効果的に表示するために型変換するサンプルクエリ
* - time 列をTIMESTAMP とすることで、System.TimeStamp では、UTCに変換されたtime列が利用される
* - DATEADD()で、9時間進めることで、日本時刻(UTC+9)に変換
* - temperature 列を文字列から数値に変換
*/
WITH transformed AS (
SELECT device,
DATEADD(hour, 9, System.TimeStamp) AS device_time,
CAST(temperature AS float) AS temperature,
nodes
FROM [eh2sa] AS i1 TIMESTAMP BY time
)
SELECT *
INTO [sa2pbi]
FROM [transformed]
-- SELECT INTO 句を追加することで、複数の出力先を指定可能
/*
SELECT *
INTO [sa2pbi2]
FROM [transformed]
*/
/* クエリ3
* 60秒毎に集計し、5秒ずつずらして集計レコードを作成するサンプルクエリ
*/
SELECT
device,
MAX(temperature) AS mx_t,
MIN(temperature) AS mn_t,
AVG(temperature) AS ag_t,
MAX(nodes) AS mx_n,
MIN(nodes) AS mn_n,
AVG(nodes) AS ag_n,
DATEADD(hh, 9, System.TimeStamp) AS tm
INTO [sa2pbi2]
FROM [eh2sa] TIMESTAMP BY time
GROUP BY device, HOPPINGWINDOW(second, 60, 5)
今回はこの辺で、

0 件のコメント:
コメントを投稿