Commit 0ad2659d authored by Vincent Charpentier's avatar Vincent Charpentier
Browse files

feat: version 1.0

A working publisher and subscriber for ZMQ.
parent 5cf89f14
......@@ -23,6 +23,22 @@
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20220320</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
......
import netscape.javascript.JSObject;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
import zmqpubsub.ZmqPublisher;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.concurrent.TimeUnit;
public class Main
public class MainPublisher
{
public static void main(String[] args) throws Exception
{
String jsonString = "{'test1':'value1','test2':{'id':0,'name':'testName'}}";
JsonObject jsonObject = (JsonObject) JsonParser.parseString(jsonString);
System.out.println(jsonObject);
ZmqPublisher publisher = new ZmqPublisher("127.0.0.1", "2001");
ZmqPublisher publisher = new ZmqPublisher("127.0.0.1", "2001");
for(int i=0; i<10; i++)
{
publisher.publish("allData", jsonObject);
System.out.println("published");
System.out.println("Sent!");
TimeUnit.SECONDS.sleep(1);
}
}
}
import com.google.gson.JsonObject;
import zmqpubsub.ZmqSubscriber;
public class mainSubscriber
{
protected static void handleIncomingData(JsonObject jsonObject)
{
System.out.println(jsonObject);
// you can uncomment this if you are working with the included publisher.
// System.out.println(jsonObject.get("test2"));
// JsonElement test2 = jsonObject.get("test2");
// JsonObject jsonObject1 = test2.getAsJsonObject();
// System.out.println(jsonObject1.get("id"));
// JsonElement id1 = jsonObject1.get("id");
// int id1AsInt = id1.getAsInt();
// System.out.println(id1AsInt);
}
public static void main(String[] args)
{
// ZmqSubscriber subscriber = new ZmqSubscriber("127.0.0.1", "2001");
ZmqSubscriber subscriber = new ZmqSubscriber("193.190.127.147", "2001"); // To test it with VS8
subscriber.subscribe("allData", mainSubscriber::handleIncomingData);
}
}
package zmqpubsub;
import com.google.gson.JsonObject;
import netscape.javascript.JSObject;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
public class ZmqPublisher
{
......@@ -26,7 +26,6 @@ public class ZmqPublisher
public void publish(String topic, JsonObject payload)
{
System.out.println(payload);
socket.send(topic+":"+payload);
}
}
package zmqpubsub;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.util.function.Consumer;
public class ZmqSubscriber
{
public ZContext context;
public ZMQ.Socket socket;
public String bind;
public ZmqSubscriber(String ip, String port)
{
context = new ZContext();
socket = context.createSocket(SocketType.SUB);
bind = "tcp://"+ ip + ":" + port;
}
public void subscribe (String topic, Consumer<JsonObject> consumer)
{
socket.connect(bind);
socket.subscribe(topic);
new Thread(() ->
{
while (true)
{
byte[] recv = socket.recv();
String receivedStringMessage = new String(recv);
// s1 contains everything after = in the original string (i.e. =....) therefore +1 to remove the =
JsonObject jsonObject = (JsonObject) JsonParser.parseString(receivedStringMessage.substring(receivedStringMessage.indexOf(":") + 1));
consumer.accept(jsonObject);
}
}).start();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment