1 module easymeshed;
2 
3 version (assert) {
4     import std.stdio : writeln;
5 }
6 
7 string readJsonObject(T)(ref T connection)
8 {
9     import vibe.stream.operations : readUntil;
10     import std.range : walkLength;
11     import std.algorithm : balancedParens, filter;
12     int countOpen = 0;
13     bool firstRun = true;
14     string line;
15     while(countOpen > 0 || firstRun)
16     {
17         auto segment = cast(string) connection.readUntil([125]);
18         auto newOpen = segment.filter!((a) => a == '{').walkLength;
19         countOpen += newOpen;
20         line ~= segment ~ "}";
21         --countOpen;
22         firstRun = false;
23     }
24     assert(line.balancedParens('{', '}'), "Object not complete");
25     return line;
26 }
27 
28 unittest 
29 {
30     import std.array : array;
31     struct Mock
32     {
33         import std.range : take, cycle;
34         ubyte[] msgs = [123, 34, 100, 101, 115, 116, 34, 58, 48, 44, 34, 102, 114, 111, 109, 34, 58, 49, 51, 54, 49, 48, 52, 56, 56, 44, 34, 116, 121, 112, 101, 34, 58, 53, 44, 34, 115, 117, 98, 115, 34, 58, 91, 93, 125, 123, 34, 100, 101, 115, 116, 34, 58, 49, 51, 54, 49, 48, 52, 56, 56, 44, 34, 102, 114, 111, 109, 34, 58, 49, 51, 54, 49, 48, 52, 56, 56, 44, 34, 116, 121, 112, 101, 34, 58, 52, 44, 34, 109, 115, 103, 34, 58, 123, 34, 116, 105, 109, 101, 34, 58, 49, 56, 48, 56, 56, 53, 50, 56, 55, 44, 34, 110, 117, 109, 34, 58, 48, 44, 34, 97, 100, 111, 112, 116, 34, 58, 102, 97, 108, 115, 101, 125, 125, 
35             123, 100, 123, 100, 123, 100, 125, 100, 125, 100, 125].cycle.take(1000).array;
36 
37         auto readUntil(ubyte[] ubs) 
38         {
39             import std.algorithm : findSplit;
40             auto splits = msgs.findSplit(ubs);
41             msgs = splits[2];
42             return splits[0].array;
43         }
44     }
45 
46     auto m = Mock();
47 
48     assert(readJsonObject(m) == q{{"dest":0,"from":13610488,"type":5,"subs":[]}});
49     assert(readJsonObject(m) == q{{"dest":13610488,"from":13610488,"type":4,"msg":{"time":180885287,"num":0,"adopt":false}}});
50     import std.stdio : writeln;
51     assert(readJsonObject(m) == q{{d{d{d}d}d}});
52 }
53 
54 /+ 
55 Start with a minimal implementation that takes a port and starts listening. It will assume you are already connected to a node.
56 
57 We need to start listening and send an empty array of subconnections. It will also need to implement sync.
58 
59 We should also make a minimal log example program, which just logs all received messages to the command line
60 +/
61 struct EasyMeshConnection
62 {
63     import vibe.data.json;
64     import vibe.core.net : TCPConnection;
65     this(string gateway, int port) 
66     {
67         import std.conv : to;
68         import vibe.core.net : connectTCP;
69         connection = connectTCP(gateway, port.to!(ushort));
70         connection.keepAlive = true;
71         import std.datetime : dur;
72         connection.readTimeout = dur!"seconds"(60);
73     }
74 
75     string readString() 
76     {
77         assert(connection.connected, "Not connected");
78         if (!connection.dataAvailableForRead)
79             return "{}";
80         return connection.readJsonObject();
81     }
82 
83     bool connected()
84     {
85         return connection.connected;
86     }
87 
88     auto read()
89     {
90         import std.json : parseJSON;
91         auto str = readString();
92         return parseJSON(str).object;
93     }
94 
95     void sendMessage(string msg)
96     {
97         import std.algorithm : map;
98         import std.array : array;
99         debug writeln("Sending: ", msg);
100         assert(connection.connected, "Lost connection when sending");
101         //connection.write(msg.map!((a) => a.to!ubyte).array);
102         connection.write(msg);
103     }
104 
105 private:
106     TCPConnection connection;
107 }
108 
109 enum meshPackageType {
110     DROP                    = 3,
111     TIME_SYNC               = 4,
112     NODE_SYNC_REQUEST       = 5,
113     NODE_SYNC_REPLY         = 6,
114     BROADCAST               = 8,  //application data for everyone
115     SINGLE                  = 9   //application data for a single node
116 };
117 
118 
119 class EasyMesh 
120 {
121     this(string gateway, int port, int nodeID = 1) 
122     {
123         _nodeID = nodeID;
124         // Setup initial connection
125         _gateway = gateway;
126         _port = port;
127         newConnection(_gateway, _port);
128     }
129 
130     import std.json : JSONValue;
131     private void sendMessage(long destID, JSONValue msg)
132     {
133         import std.conv : to;
134         msg["dest"] = destID;
135         msg["from"] = _nodeID;
136 
137         assert("type" in msg, "All messages need to have a type specified");
138         //connections[destID].sendMessage(msg.to!string);
139 
140         // TODO currently this just sends it to the only connection available
141         auto destination = connections.values[0];
142         if (!destination.connected) {
143             newConnection(_gateway, _port);
144             sendMessage(destID, msg);
145         } else {
146             destination.sendMessage(msg.to!string);
147         }
148     }
149 
150     void sendSingle(long destID, string msg)
151     {
152         import std.format : format;
153         import std.json : parseJSON;
154         sendMessage(destID, 
155             format(q{{"type": %d, "msg": "%s"}},
156                 meshPackageType.SINGLE, msg).parseJSON);
157 
158     }
159 
160     void sendBroadcast(string msg)
161     {
162         import std.conv : to;
163         import std.format : format;
164         
165         auto destination = connections.values[0];
166         if (!destination.connected) {
167             newConnection(_gateway, _port);
168             sendBroadcast(msg);
169         } else {
170             destination.sendMessage(
171                 format(q{{"from": %s, "type": %d, "msg": "%s"}},
172                     _nodeID, meshPackageType.BROADCAST, msg));
173         }
174     }
175 
176     void update()
177     {
178         import std.datetime : dur;
179         import std.json : parseJSON;
180         foreach(k, v; connections) {
181             if (!v.connected)
182                 newConnection(_gateway, _port);
183             else {
184                 assert(v.connected, "Lost connection");
185                 if (v.connection.waitForData(dur!("msecs")(10)))
186                     handleMessage(v.read());
187             }
188         }
189     }
190 
191     void setReceiveCallback(void delegate(long from, JSONValue[string] msg) callBack )
192     {
193         callBacks ~= callBack;
194     }
195 
196     private void newConnection(string gateway, int port) 
197     {
198         import std.conv : to;
199         import std.json : parseJSON;
200         import std.format : format;
201         debug writeln("Setting up connection with: ", _gateway, ":", _port);
202         auto connection = EasyMeshConnection(gateway, port);
203 
204         connection.connection.waitForData;
205         auto json = connection.read();
206         auto connectionID = json["from"].integer;
207         connections[connectionID] = connection;
208 
209         // Say hello
210         sendMessage(connectionID, 
211             parseJSON(format(q{{"type": %s, "subs":[]}}, 
212                 meshPackageType.NODE_SYNC_REPLY.to!int)));
213         sendMessage(connectionID, 
214             parseJSON(format(q{{"type": %s, "subs":[]}}, 
215                 meshPackageType.NODE_SYNC_REQUEST.to!int)));
216     }
217 
218     private void handleMessage(JSONValue[string] msg) 
219     {
220         debug writeln("handleMessage: ", msg);
221         auto type = msg["type"].integer;
222 
223         if (type == meshPackageType.NODE_SYNC_REQUEST) {
224             import std.conv : to;
225             import std.json : parseJSON;
226             import std.format : format;
227             sendMessage(msg["from"].integer, 
228                 parseJSON(format(q{{"type": %s, "subs":[]}}, 
229                     meshPackageType.NODE_SYNC_REPLY.to!int)));
230         }
231         else if (type == meshPackageType.BROADCAST || type == meshPackageType.SINGLE)
232         {
233             if (type == meshPackageType.SINGLE && msg["dest"].integer != _nodeID)
234             {
235                 debug writeln("Received a message not intended for us: ", msg);
236             } else {
237                 foreach(cb; callBacks) {
238                     cb(msg["from"].integer, msg);
239                 }
240             }
241         }
242     }
243 
244 private: 
245     long _nodeID;
246     EasyMeshConnection[long] connections;
247 
248     alias callBackDelegate = void delegate(long from, JSONValue[string] msg);
249     callBackDelegate[] callBacks;
250 
251     string _gateway;
252     int _port;
253 }