Event Hubsの通信プロトコルはAMQPを利用しています。「IBM Bluemix」ではMQTTを採用しています。
Event Hubsへのデータ送信は「.NET」、「Java」環境ともにMicrosoftのサイトにサンプルコードが存在します。下図にある各「Partition」を指定して送信することも可能ですが、特に指定せずに送信を行うと、受信する「Partition」はラウンドロビン的に割り当てられます。
また、「Partition」に格納されたメッセージは順序性が保証されますが、「Partition」間の順序性は保たれないものと思っておいたほうがよさそうです。
Event Hubsで扱うメッセージのトランザクションはサポートされません、これはIoTなどでの利用を考慮したスケーラビリティへの対処と思われます。
このサービスは非常に大量のデータトラフィック1秒あたり数百万メッセージを処理するために存在します。よって、メッセージを受信するEvent Consumerは「.NET」環境では「EventProcesseerHost」クラスが用意されています。
またJava環境ではJMSインターフェイスで各パーティションのConsumerになることが可能ですが、Microsoftは「Apache Storm」でリアルタイム処理をすることを推奨しているようです。
送信データクラス
package foo.bar.data;
/**
* メッセージサンプルデータクラス
*/
public class TestData {
public String device;
public String time;
public String temperature;
public String humidity;
public String nodes;
public TestData(String device, String time, String temperature,
String humidity, String nodes) {
this.device = device;
this.time = time;
this.temperature = temperature;
this.humidity = humidity;
this.nodes = nodes;
}
}
送信クラス
package foo.bar.util;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import foo.bar.data.TestData;
public class EventHubSenderOrgMain {
/**
* JMSインターフェイスを用いてEvent Hubsにメッセージを送信するサンプル。
*
* @param args
* @throws NamingException
* @throws JMSException
* @throws IOException
* @throws InterruptedException
*/
public static void send() throws NamingException, JMSException,
IOException, InterruptedException {
// 初期情報
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
env.put(Context.PROVIDER_URL, "servicebus.properties");
// イニシャルコンテキスト作成
Context context = new InitialContext(env);
// コネクションファクトリ生成
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// 送信先指定
Destination queue = (Destination) context.lookup("EventHub");
// コネクション生成
Connection connection = cf.createConnection();
// Create sender-side Session and MessageProducer
// 送信セッションとメッセージプロデューサ作成
Session sendSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = sendSession.createProducer(queue);
for (int i = 0; i < 20; i++) {
sendBytesMessage(sendSession, sender);
Thread.sleep(200);
}
}
private static void sendBytesMessage(Session sendSession,
MessageProducer sender) throws JMSException,
UnsupportedEncodingException {
// 送信データの作成
BytesMessage message = sendSession.createBytesMessage();
TestData testData = new TestData("DEV-java", new Date().toString(), "28", "70",
"5");
// JSONにする。
message.writeBytes(JsonTranceform.toJson(testData).getBytes("UTF-8"));
// メッセージを送信する。
sender.send(message);
System.out.println("Sent message");
}
public static void main(String[] args) {
try {
EventHubSenderOrgMain.send();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
servicebus.properties クラスパスのルートに配備してください。
# servicebus.properties - sample JNDI configuration # Register a ConnectionFactory in JNDI using the form: # connectionfactory.[jndi_name] = [ConnectionURL] connectionfactory.SBCF=amqps://SendRule:[your rule kay]@[your namespace].servicebus.windows.net/?sync-publish=false # Register some queues in JNDI using the form # queue.[jndi_name] = [physical_name] # topic.[jndi_name] = [physical_name] queue.EventHub=[Event Hub Name]
【スケーラビリティに関する考察】
IoTシステムではセンサーデバイスなどの数が多くなることが予想される、デバイスが送信するデータのストア先は今回のサンプルプロジェクトではSQLサーバを用いたが、よりスループットの高いデータストアに蓄積することを考えなければいけないと思いました。
本日はこの辺で、

0 件のコメント:
コメントを投稿