UartBus source documentation
UartBus.java
1 package eu.javaexperience.electronic.uartbus.rpc.client;
2 
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.util.Arrays;
6 import java.util.LinkedList;
7 import java.util.concurrent.TimeUnit;
8 
9 import eu.javaexperience.asserts.AssertArgument;
10 import eu.javaexperience.datastorage.TransactionException;
16 import eu.javaexperience.exceptions.IllegalOperationException;
17 import eu.javaexperience.interfaces.simple.SimpleGet;
18 import eu.javaexperience.interfaces.simple.publish.SimplePublish1;
19 import eu.javaexperience.io.IOTools;
20 import eu.javaexperience.log.JavaExperienceLoggingFacility;
21 import eu.javaexperience.log.LogLevel;
22 import eu.javaexperience.log.Loggable;
23 import eu.javaexperience.log.Logger;
24 import eu.javaexperience.log.LoggingTools;
25 import eu.javaexperience.measurement.MeasurementSerie;
26 import eu.javaexperience.multithread.notify.WaitForSingleEvent;
27 import eu.javaexperience.patterns.behavioral.mediator.EventMediator;
28 import eu.javaexperience.semantic.references.MayNull;
29 
30 public class UartBus implements Closeable
31 {
32  protected static final Logger LOG = JavaExperienceLoggingFacility.getLogger(new Loggable("UartBus"));
33 
34  //TODO abstract stream pair to make capable to connect directly to the
35  //bus through ttyUSBX
36 
37  protected Closeable connResource;
38  protected int fromAddress;
39 
40  protected SimplePublish1<byte[]> sendPacket;
41 
42  protected final EventMediator<ParsedUartBusPacket> onNewValidPackageReceived = new EventMediator<>();
43 
44  protected final LinkedList<UartbusTransaction> pendingRequests = new LinkedList<>();
45 
46  protected final EventMediator<byte[]> invalidPackets = new EventMediator<>();
47 
48  protected final EventMediator<byte[]> unrelatedPackets = new EventMediator<>();
49 
50  public UartBus
51  (
52  @MayNull Closeable resource,
53  SimplePublish1<byte[]> sendPacket,
54  int fromAddress
55  )
56  {
57  this.connResource = resource;
58  this.sendPacket = sendPacket;
59  this.fromAddress = fromAddress;
60 
61  onNewValidPackageReceived.addEventListener
62  (
63  (a)->
64  {
65  synchronized(pendingRequests)
66  {
67  for(UartbusTransaction pr:pendingRequests)
68  {
69  if(pr.tryAcceptResponse(a))
70  {
71  pendingRequests.remove(pr);
72  return;
73  }
74  }
75  }
76 
77  unrelatedPackets.dispatchEvent(a.rawPacket);
78  }
79  );
80  }
81 
82  public EventMediator<byte[]> getInvalidPacketListener()
83  {
84  return invalidPackets;
85  }
86 
87  public EventMediator<byte[]> getUnrelatedPacketListener()
88  {
89  return unrelatedPackets;
90  }
91 
92  public void processPacket(byte[] packet)
93  {
94  if(null == packet)
95  {
96  return;
97  }
98 
99  byte[] data = UartbusTools.getValidPacket(packet);
100 
101  if(null != data)
102  {
103  ParsedUartBusPacket p = null;
104 
105  try
106  {
107  p = new ParsedUartBusPacket(data);
108  }
109  catch(Exception e)
110  {
111  LoggingTools.tryLogFormatException(LOG, LogLevel.NOTICE, e, "Exception while parsing packet ");
112  }
113 
114  if(null != p)
115  {
116  onNewValidPackageReceived.dispatchEvent(p);
117  }
118  }
119  else
120  {
121  invalidPackets.dispatchEvent(packet);
122  }
123  }
124 
125  public void addPendingRequest(UartbusTransaction req)
126  {
127  synchronized(pendingRequests)
128  {
129  if(pendingRequests.contains(req))
130  {
131  throw new IllegalOperationException("Transaction already added to pending requests.");
132  }
133  pendingRequests.add(req);
134  }
135  }
136 
137  public boolean revokePendingRequest(UartbusTransaction req)
138  {
139  synchronized(pendingRequests)
140  {
141  return pendingRequests.remove(req);
142  }
143  }
144 
145  public UartbusTransaction newTransaction
146  (
147  int to,
148  byte[] path,
149  byte[] payloadData,
150  boolean zeroNamespaceAnswer
151  )
152  throws IOException
153  {
154  UartbusTransaction req = new UartbusTransaction(this);
155  req.from = fromAddress;
156  req.to = to;
157  req.path = path;
158  req.payload = payloadData;
159  req.zeroNamespaceAnswer = zeroNamespaceAnswer;
160  req.send();
161  return req;
162  }
163 
164  public UartbusTransaction subscribeResponse(int from, int to, byte[] path, boolean zeroNamespace)
165  {
166  UartbusTransaction ret = new UartbusTransaction(this);
167  ret.from = from;
168  ret.to = to;
169  ret.path = path;
170  ret.zeroNamespaceAnswer = zeroNamespace;
171  addPendingRequest(ret);
172  return ret;
173  }
174 
175  public static UartBus fromTcp(String ip, int port, int fromAddress) throws IOException
176  {
177  UartbusStreamerEndpoint conn = UartbusRpcClientTools.openIpEndpoint(ip, port, null, true);
178  UartBus ret = new UartBus(conn, conn::sendPacket, fromAddress);
179  conn.getPacketStreamer().addEventListener(ret::processPacket);
180  conn.startStreaming();
181  return ret;
182  }
183 
184  @Override
185  public void close() throws IOException
186  {
187  IOTools.silentClose(connResource);
188  }
189 
190  public int getFromAddress()
191  {
192  return fromAddress;
193  }
194 
195  public UartBusDevice device(int addr)
196  {
197  return new UartBusDevice(this, addr);
198  }
199 
200  public void sendRawPacket(byte[] data)
201  {
202  sendPacket.publish(data);
203  }
204 
205  public static UartBus fromConnection(UartbusConnection srv, boolean closeConnOnClose, int busFromAddress)
206  {
207  return fromConnection
208  (
209  ()->srv,
210  ()->
211  {
212  IOTools.silentClose(srv);
213  if(closeConnOnClose)
214  {
215  IOTools.silentClose(srv);
216  }
217  },
218  busFromAddress
219  );
220  }
221 
222  public static UartBus fromConnection(SimpleGet<UartbusConnection> gSrv, Closeable onClose, int busFromAddress)
223  {
225  UartBus bus = new UartBus
226  (
227  onClose,
228  stream::sendPacket,
229  busFromAddress
230  );
231 
232  stream.getPacketStreamer().addEventListener(bus::processPacket);
233  stream.startStreaming();
234  return bus;
235  }
236 }