Event Hubsの通信プロトコルはAMQPを利用しています。「IBM Bluemix」ではMQTTを採用しています。
Event Hubsへのデータ送信は「.NET」、「Java」環境ともにMicrosoftのサイトにサンプルコードが存在します。下図にある各「Partition」を指定して送信することも可能ですが、特に指定せずに送信を行うと、受信する「Partition」はラウンドロビン的に割り当てられます。
また、「Partition」に格納されたメッセージは順序性が保証されますが、「Partition」間の順序性は保たれないものと思っておいたほうがよさそうです。
Event Hubsで扱うメッセージのトランザクションはサポートされません、これはIoTなどでの利用を考慮したスケーラビリティへの対処と思われます。
このサービスは非常に大量のデータトラフィック1秒あたり数百万メッセージを処理するために存在します。よって、メッセージを受信するEvent Consumerは「.NET」環境では「EventProcesseerHost」クラスが用意されています。
またJava環境ではJMSインターフェイスで各パーティションのConsumerになることが可能ですが、Microsoftは「Apache Storm」でリアルタイム処理をすることを推奨しているようです。
送信データクラス
package foo.bar.data; /** * メッセージサンプルデータクラス */ public class TestData { public String device; public String time; public String temperature; public String humidity; public String nodes; public TestData(String device, String time, String temperature, String humidity, String nodes) { this.device = device; this.time = time; this.temperature = temperature; this.humidity = humidity; this.nodes = nodes; } }
送信クラス
package foo.bar.util; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Date; import java.util.Hashtable; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import foo.bar.data.TestData; public class EventHubSenderOrgMain { /** * JMSインターフェイスを用いてEvent Hubsにメッセージを送信するサンプル。 * * @param args * @throws NamingException * @throws JMSException * @throws IOException * @throws InterruptedException */ public static void send() throws NamingException, JMSException, IOException, InterruptedException { // 初期情報 Hashtableenv = new Hashtable (); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory"); env.put(Context.PROVIDER_URL, "servicebus.properties"); // イニシャルコンテキスト作成 Context context = new InitialContext(env); // コネクションファクトリ生成 ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF"); // 送信先指定 Destination queue = (Destination) context.lookup("EventHub"); // コネクション生成 Connection connection = cf.createConnection(); // Create sender-side Session and MessageProducer // 送信セッションとメッセージプロデューサ作成 Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer sender = sendSession.createProducer(queue); for (int i = 0; i < 20; i++) { sendBytesMessage(sendSession, sender); Thread.sleep(200); } } private static void sendBytesMessage(Session sendSession, MessageProducer sender) throws JMSException, UnsupportedEncodingException { // 送信データの作成 BytesMessage message = sendSession.createBytesMessage(); TestData testData = new TestData("DEV-java", new Date().toString(), "28", "70", "5"); // JSONにする。 message.writeBytes(JsonTranceform.toJson(testData).getBytes("UTF-8")); // メッセージを送信する。 sender.send(message); System.out.println("Sent message"); } public static void main(String[] args) { try { EventHubSenderOrgMain.send(); } catch (NamingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
servicebus.properties クラスパスのルートに配備してください。
# servicebus.properties - sample JNDI configuration # Register a ConnectionFactory in JNDI using the form: # connectionfactory.[jndi_name] = [ConnectionURL] connectionfactory.SBCF=amqps://SendRule:[your rule kay]@[your namespace].servicebus.windows.net/?sync-publish=false # Register some queues in JNDI using the form # queue.[jndi_name] = [physical_name] # topic.[jndi_name] = [physical_name] queue.EventHub=[Event Hub Name]
【スケーラビリティに関する考察】
IoTシステムではセンサーデバイスなどの数が多くなることが予想される、デバイスが送信するデータのストア先は今回のサンプルプロジェクトではSQLサーバを用いたが、よりスループットの高いデータストアに蓄積することを考えなければいけないと思いました。
本日はこの辺で、
0 件のコメント:
コメントを投稿