1 module easymeshed; 2 3 version (assert) { 4 import std.stdio : writeln; 5 } 6 7 string readJsonObject(T)(ref T connection) 8 { 9 import vibe.stream.operations : readUntil; 10 import std.range : walkLength; 11 import std.algorithm : balancedParens, filter; 12 int countOpen = 0; 13 bool firstRun = true; 14 string line; 15 while(countOpen > 0 || firstRun) 16 { 17 auto segment = cast(string) connection.readUntil([125]); 18 auto newOpen = segment.filter!((a) => a == '{').walkLength; 19 countOpen += newOpen; 20 line ~= segment ~ "}"; 21 --countOpen; 22 firstRun = false; 23 } 24 assert(line.balancedParens('{', '}'), "Object not complete"); 25 return line; 26 } 27 28 unittest 29 { 30 import std.array : array; 31 struct Mock 32 { 33 import std.range : take, cycle; 34 ubyte[] msgs = [123, 34, 100, 101, 115, 116, 34, 58, 48, 44, 34, 102, 114, 111, 109, 34, 58, 49, 51, 54, 49, 48, 52, 56, 56, 44, 34, 116, 121, 112, 101, 34, 58, 53, 44, 34, 115, 117, 98, 115, 34, 58, 91, 93, 125, 123, 34, 100, 101, 115, 116, 34, 58, 49, 51, 54, 49, 48, 52, 56, 56, 44, 34, 102, 114, 111, 109, 34, 58, 49, 51, 54, 49, 48, 52, 56, 56, 44, 34, 116, 121, 112, 101, 34, 58, 52, 44, 34, 109, 115, 103, 34, 58, 123, 34, 116, 105, 109, 101, 34, 58, 49, 56, 48, 56, 56, 53, 50, 56, 55, 44, 34, 110, 117, 109, 34, 58, 48, 44, 34, 97, 100, 111, 112, 116, 34, 58, 102, 97, 108, 115, 101, 125, 125, 35 123, 100, 123, 100, 123, 100, 125, 100, 125, 100, 125].cycle.take(1000).array; 36 37 auto readUntil(ubyte[] ubs) 38 { 39 import std.algorithm : findSplit; 40 auto splits = msgs.findSplit(ubs); 41 msgs = splits[2]; 42 return splits[0].array; 43 } 44 } 45 46 auto m = Mock(); 47 48 assert(readJsonObject(m) == q{{"dest":0,"from":13610488,"type":5,"subs":[]}}); 49 assert(readJsonObject(m) == q{{"dest":13610488,"from":13610488,"type":4,"msg":{"time":180885287,"num":0,"adopt":false}}}); 50 import std.stdio : writeln; 51 assert(readJsonObject(m) == q{{d{d{d}d}d}}); 52 } 53 54 /+ 55 Start with a minimal implementation that takes a port and starts listening. It will assume you are already connected to a node. 56 57 We need to start listening and send an empty array of subconnections. It will also need to implement sync. 58 59 We should also make a minimal log example program, which just logs all received messages to the command line 60 +/ 61 struct EasyMeshConnection 62 { 63 import vibe.data.json; 64 import vibe.core.net : TCPConnection; 65 this(string gateway, int port) 66 { 67 import std.conv : to; 68 import vibe.core.net : connectTCP; 69 connection = connectTCP(gateway, port.to!(ushort)); 70 connection.keepAlive = true; 71 import std.datetime : dur; 72 connection.readTimeout = dur!"seconds"(60); 73 } 74 75 string readString() 76 { 77 assert(connection.connected, "Not connected"); 78 if (!connection.dataAvailableForRead) 79 return "{}"; 80 return connection.readJsonObject(); 81 } 82 83 bool connected() 84 { 85 return connection.connected; 86 } 87 88 auto read() 89 { 90 import std.json : parseJSON; 91 auto str = readString(); 92 return parseJSON(str).object; 93 } 94 95 void sendMessage(string msg) 96 { 97 import std.algorithm : map; 98 import std.array : array; 99 debug writeln("Sending: ", msg); 100 assert(connection.connected, "Lost connection when sending"); 101 //connection.write(msg.map!((a) => a.to!ubyte).array); 102 connection.write(msg); 103 } 104 105 private: 106 TCPConnection connection; 107 } 108 109 enum meshPackageType { 110 DROP = 3, 111 TIME_SYNC = 4, 112 NODE_SYNC_REQUEST = 5, 113 NODE_SYNC_REPLY = 6, 114 BROADCAST = 8, //application data for everyone 115 SINGLE = 9 //application data for a single node 116 }; 117 118 119 class EasyMesh 120 { 121 this(string gateway, int port, int nodeID = 1) 122 { 123 _nodeID = nodeID; 124 // Setup initial connection 125 _gateway = gateway; 126 _port = port; 127 newConnection(_gateway, _port); 128 } 129 130 import std.json : JSONValue; 131 private void sendMessage(long destID, JSONValue msg) 132 { 133 import std.conv : to; 134 msg["dest"] = destID; 135 msg["from"] = _nodeID; 136 137 assert("type" in msg, "All messages need to have a type specified"); 138 //connections[destID].sendMessage(msg.to!string); 139 140 // TODO currently this just sends it to the only connection available 141 auto destination = connections.values[0]; 142 if (!destination.connected) { 143 newConnection(_gateway, _port); 144 sendMessage(destID, msg); 145 } else { 146 destination.sendMessage(msg.to!string); 147 } 148 } 149 150 void sendSingle(long destID, string msg) 151 { 152 import std.format : format; 153 import std.json : parseJSON; 154 sendMessage(destID, 155 format(q{{"type": %d, "msg": "%s"}}, 156 meshPackageType.SINGLE, msg).parseJSON); 157 158 } 159 160 void sendBroadcast(string msg) 161 { 162 import std.conv : to; 163 import std.format : format; 164 165 auto destination = connections.values[0]; 166 if (!destination.connected) { 167 newConnection(_gateway, _port); 168 sendBroadcast(msg); 169 } else { 170 destination.sendMessage( 171 format(q{{"from": %s, "type": %d, "msg": "%s"}}, 172 _nodeID, meshPackageType.BROADCAST, msg)); 173 } 174 } 175 176 void update() 177 { 178 import std.datetime : dur; 179 import std.json : parseJSON; 180 foreach(k, v; connections) { 181 if (!v.connected) 182 newConnection(_gateway, _port); 183 else { 184 assert(v.connected, "Lost connection"); 185 if (v.connection.waitForData(dur!("msecs")(10))) 186 handleMessage(v.read()); 187 } 188 } 189 } 190 191 void setReceiveCallback(void delegate(long from, JSONValue[string] msg) callBack ) 192 { 193 callBacks ~= callBack; 194 } 195 196 private void newConnection(string gateway, int port) 197 { 198 import std.conv : to; 199 import std.json : parseJSON; 200 import std.format : format; 201 debug writeln("Setting up connection with: ", _gateway, ":", _port); 202 auto connection = EasyMeshConnection(gateway, port); 203 204 connection.connection.waitForData; 205 auto json = connection.read(); 206 auto connectionID = json["from"].integer; 207 connections[connectionID] = connection; 208 209 // Say hello 210 sendMessage(connectionID, 211 parseJSON(format(q{{"type": %s, "subs":[]}}, 212 meshPackageType.NODE_SYNC_REPLY.to!int))); 213 sendMessage(connectionID, 214 parseJSON(format(q{{"type": %s, "subs":[]}}, 215 meshPackageType.NODE_SYNC_REQUEST.to!int))); 216 } 217 218 private void handleMessage(JSONValue[string] msg) 219 { 220 debug writeln("handleMessage: ", msg); 221 auto type = msg["type"].integer; 222 223 if (type == meshPackageType.NODE_SYNC_REQUEST) { 224 import std.conv : to; 225 import std.json : parseJSON; 226 import std.format : format; 227 sendMessage(msg["from"].integer, 228 parseJSON(format(q{{"type": %s, "subs":[]}}, 229 meshPackageType.NODE_SYNC_REPLY.to!int))); 230 } 231 else if (type == meshPackageType.BROADCAST || type == meshPackageType.SINGLE) 232 { 233 if (type == meshPackageType.SINGLE && msg["dest"].integer != _nodeID) 234 { 235 debug writeln("Received a message not intended for us: ", msg); 236 } else { 237 foreach(cb; callBacks) { 238 cb(msg["from"].integer, msg); 239 } 240 } 241 } 242 } 243 244 private: 245 long _nodeID; 246 EasyMeshConnection[long] connections; 247 248 alias callBackDelegate = void delegate(long from, JSONValue[string] msg); 249 callBackDelegate[] callBacks; 250 251 string _gateway; 252 int _port; 253 }