2015/08/13

クラウドで「IoT」ことはじめ - サービス編 - Stream Analytics

 Stream Analyticsはセンサーデバイスやフロントアプリケーションなどから取り留めも無くやってくるストリームデータを処理するサービスです。

 入力データストリームからやってくるデータを各出力ストリームに応じたクエリを定義することができ、出力するデータを処理することができます。

現時点では扱えるデータリームは以下のとおりになります。



  • 入力
    • データストリーム
      • Event Hubs
      • BLOB Storage
    • 参照データ
      • BLOB Storage
  • 出力
    • SQL Database
    • BLOB Storage
    • Event Hubs
    • Power BI
    • Table Storage
    • Service Bus Queue
    • Service Bus Topic


  • クエリ1 : 入力ストリームをそのまま「SQL Database」テーブルに格納する。
  • クエリ2 : 入力ストリームをそのまま「Power BI」に出力する。
  • クエリ3 : 入力ストリームを60秒おきに集計し「Power BI」に出力する。
 クエリはデータの条件及び集計を定義でき、不要データを省いたり集計結果を別の出力に向けることができます。詳しいクエリの構文はここへ。

■bci-test-streamanalytics-01のクエリ
/* クエリ1
SELECT
    *
INTO
    [sa2db]
FROM
    [eh2sa]  
 */
■bci-test-streamanalytics-02のクエリ
/* クエリ2
 * デバイスからの生データをPower BI で効果的に表示するために型変換するサンプルクエリ
 * - time 列をTIMESTAMP とすることで、System.TimeStamp では、UTCに変換されたtime列が利用される
 * - DATEADD()で、9時間進めることで、日本時刻(UTC+9)に変換
 * - temperature 列を文字列から数値に変換
 */
WITH transformed AS ( 
  SELECT device,
    DATEADD(hour, 9, System.TimeStamp) AS device_time,
    CAST(temperature AS float) AS temperature,
    nodes
  FROM [eh2sa] AS i1 TIMESTAMP BY time
)

SELECT *
INTO [sa2pbi]
FROM [transformed]

-- SELECT INTO 句を追加することで、複数の出力先を指定可能
/*
SELECT *
INTO [sa2pbi2]
FROM [transformed]
*/

/* クエリ3
 * 60秒毎に集計し、5秒ずつずらして集計レコードを作成するサンプルクエリ
 */
SELECT
  device,
  MAX(temperature) AS mx_t,
  MIN(temperature) AS mn_t,
  AVG(temperature) AS ag_t,
  MAX(nodes) AS mx_n,
  MIN(nodes) AS mn_n,
  AVG(nodes) AS ag_n,
  DATEADD(hh, 9, System.TimeStamp) AS tm
INTO [sa2pbi2]
FROM [eh2sa] TIMESTAMP BY time
GROUP BY device, HOPPINGWINDOW(second, 60, 5)

今回はこの辺で、

0 件のコメント:

コメントを投稿