diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 44749667..e3f44f75 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -253,7 +253,7 @@ def onConnected(interface): args = mt_config.args # do not print this line if we are exporting the config - if not args.export_config: + if not (args.export_config or args.json): print("Connected to radio") if args.setlat or args.setlon or args.setalt: @@ -445,8 +445,12 @@ def onConnected(interface): else: channelIndex = mt_config.channel_index or 0 if checkChannel(interface, channelIndex): - print(f"Sending telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)") - interface.sendTelemetry(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex) + if not args.json: + print(f"Sending telemetry request to {args.dest} " + f"on channelIndex:{channelIndex} (this could take a while)") + interface.sendTelemetry( + destinationId=args.dest, wantResponse=True, channelIndex=channelIndex, jsonResponse=args.json + ) if args.request_position: if args.dest == BROADCAST_ADDR: @@ -454,8 +458,11 @@ def onConnected(interface): else: channelIndex = mt_config.channel_index or 0 if checkChannel(interface, channelIndex): - print(f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)") - interface.sendPosition(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex) + if not args.json: + print(f"Sending position request to {args.dest} " + f"on channelIndex:{channelIndex} (this could take a while)") + interface.sendPosition(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex, + jsonResponse=args.json) if args.gpio_wrb or args.gpio_rd or args.gpio_watch: if args.dest == BROADCAST_ADDR: @@ -819,9 +826,13 @@ def setSimpleConfig(modem_preset): if args.nodes: closeNow = True if args.dest != BROADCAST_ADDR: - print("Showing node list of a remote node is not supported.") + if args.json: + print("[]") + else: + print("Showing node list of a remote node is not supported.") return - interface.showNodes() + interface.showNodes(jsonResponse=args.json) + if args.qr or args.qr_all: closeNow = True @@ -866,7 +877,10 @@ def setSimpleConfig(modem_preset): interface.close() # after running command then exit except Exception as ex: - print(f"Aborting due to: {ex}") + if args.json: + print("") + else: + print(f"Aborting due to: {ex}") interface.close() # close the connection now, so that our app exits sys.exit(1) @@ -1461,6 +1475,12 @@ def initParser(): "--debug", help="Show API library debug log messages", action="store_true" ) + group.add_argument( + "--json", + help="Output JSON objects for --nodes, --request-telemetry, --request-position", + action="store_true" + ) + group.add_argument( "--test", help="Run stress test against all connected Meshtastic devices", diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index e6848502..9d4b54c7 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -138,7 +138,7 @@ def showInfo(self, file=sys.stdout) -> str: # pylint: disable=W0613 print(infos) return infos - def showNodes(self, includeSelf: bool=True, file=sys.stdout) -> str: # pylint: disable=W0613 + def showNodes(self, includeSelf: bool=True, jsonResponse: bool=False) -> List: # pylint: disable=W0613 """Show table summary of nodes in mesh""" def formatFloat(value, precision=2, unit="") -> Optional[str]: @@ -224,9 +224,13 @@ def getTimeAgo(ts) -> Optional[str]: for i, row in enumerate(rows): row["N"] = i + 1 - table = tabulate(rows, headers="keys", missingval="N/A", tablefmt="fancy_grid") - print(table) - return table + if jsonResponse: + print(json.dumps(rows, indent=2)) + else: + table = tabulate(rows, headers="keys", missingval="N/A", tablefmt="fancy_grid") + print(table) + + return rows def getNode(self, nodeId: str, requestChannels: bool=True) -> meshtastic.node.Node: """Return a node object which contains device settings and channel info""" @@ -353,6 +357,7 @@ def sendPosition( wantAck: bool=False, wantResponse: bool=False, channelIndex: int=0, + jsonResponse: bool=False, ): """ Send a position packet to some other node (normally a broadcast) @@ -384,7 +389,10 @@ def sendPosition( logging.debug(f"p.time:{p.time}") if wantResponse: - onResponse = self.onResponsePosition + if jsonResponse: + onResponse = self.onResponsePositionJson + else: + onResponse = self.onResponsePosition else: onResponse = None @@ -401,33 +409,69 @@ def sendPosition( self.waitForPosition() return d - def onResponsePosition(self, p): - """on response for position""" - if p["decoded"]["portnum"] == 'POSITION_APP': - self._acknowledgment.receivedPosition = True - position = mesh_pb2.Position() - position.ParseFromString(p["decoded"]["payload"]) - - ret = "Position received: " - if position.latitude_i != 0 and position.longitude_i != 0: - ret += f"({position.latitude_i * 10**-7}, {position.longitude_i * 10**-7})" - else: - ret += "(unknown)" - if position.altitude != 0: - ret += f" {position.altitude}m" - - if position.precision_bits not in [0,32]: - ret += f" precision:{position.precision_bits}" - elif position.precision_bits == 32: - ret += " full precision" - elif position.precision_bits == 0: - ret += " position disabled" + def onResponsePosition(self, + destinationId: Union[int, str] = BROADCAST_ADDR, + jsonResponse: bool = False + ): - print(ret) + def responsePosition(p): + """on response for position""" + if p["decoded"]["portnum"] == 'POSITION_APP': + self._acknowledgment.receivedPosition = True + position = mesh_pb2.Position() + position.ParseFromString(p["decoded"]["payload"]) - elif p["decoded"]["portnum"] == 'ROUTING_APP': - if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE': - our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.") + if jsonResponse: + self.printJsonPosition(position, destinationId) + else: + self.printPosition(position) + + elif p["decoded"]["portnum"] == 'ROUTING_APP': + if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE': + if jsonResponse: + our_exit("{}") + else: + our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.") + return responsePosition + + @staticmethod + def printPosition(position: mesh_pb2.Position): + ret = "Position received: " + if position.latitude_i != 0 and position.longitude_i != 0: + ret += f"({position.latitude_i * 10**-7}, {position.longitude_i * 10**-7})" + else: + ret += "(unknown)" + if position.altitude != 0: + ret += f" {position.altitude}m" + + if position.precision_bits not in [0,32]: + ret += f" precision:{position.precision_bits}" + elif position.precision_bits == 32: + ret += " full precision" + elif position.precision_bits == 0: + ret += " position disabled" + + print(ret) + + @staticmethod + def printJsonPosition(position: mesh_pb2.Position, + destinationId: Union[int, str] = BROADCAST_ADDR): + json_output = { + "node_id": destinationId + } + json_output["latitude"] = position.latitude_i * 10**-7 + json_output["longitude"] = position.longitude_i * 10**-7 + json_output["altitude"] = position.altitude + + json_output["precision"] = position.precision_bits + json_output["full"] = False + json_output["enabled"] = True + if position.precision_bits == 32: + json_output["full"] = True + elif position.precision_bits == 0: + json_output["enabled"] = False + + print(json.dumps(json_output, indent=2)) def sendTraceRoute(self, dest: Union[int, str], hopLimit: int, channelIndex: int=0): """Send the trace route""" @@ -460,7 +504,11 @@ def onResponseTraceRoute(self, p): self._acknowledgment.receivedTraceRoute = True - def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantResponse: bool=False, channelIndex: int=0): + def sendTelemetry(self, + destinationId: Union[int, str] = BROADCAST_ADDR, + wantResponse: bool = False, + channelIndex: int = 0, + jsonResponse: bool = False): """Send telemetry and optionally ask for a response""" r = telemetry_pb2.Telemetry() @@ -482,10 +530,9 @@ def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantRespon if air_util_tx is not None: r.device_metrics.air_util_tx = air_util_tx + onResponse = None if wantResponse: - onResponse = self.onResponseTelemetry - else: - onResponse = None + onResponse = self.onResponseTelemetry(destinationId, jsonResponse) self.sendData( r, @@ -498,28 +545,59 @@ def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantRespon if wantResponse: self.waitForTelemetry() - def onResponseTelemetry(self, p): - """on response for telemetry""" - if p["decoded"]["portnum"] == 'TELEMETRY_APP': - self._acknowledgment.receivedTelemetry = True - telemetry = telemetry_pb2.Telemetry() - telemetry.ParseFromString(p["decoded"]["payload"]) - - print("Telemetry received:") - if telemetry.device_metrics.battery_level is not None: - print(f"Battery level: {telemetry.device_metrics.battery_level:.2f}%") - if telemetry.device_metrics.voltage is not None: - print(f"Voltage: {telemetry.device_metrics.voltage:.2f} V") - if telemetry.device_metrics.channel_utilization is not None: - print( - f"Total channel utilization: {telemetry.device_metrics.channel_utilization:.2f}%" - ) - if telemetry.device_metrics.air_util_tx is not None: - print(f"Transmit air utilization: {telemetry.device_metrics.air_util_tx:.2f}%") - - elif p["decoded"]["portnum"] == 'ROUTING_APP': - if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE': - our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.") + def onResponseTelemetry(self, + destinationId: Union[int, str] = BROADCAST_ADDR, + jsonResponse: bool = False + ): + def responseTelemetry(p): + """on response for telemetry""" + if p["decoded"]["portnum"] == 'TELEMETRY_APP': + self._acknowledgment.receivedTelemetry = True + telemetry = telemetry_pb2.Telemetry() + telemetry.ParseFromString(p["decoded"]["payload"]) + + if jsonResponse: + self.printJsonTelemetry(telemetry, destinationId) + else: + self.printTelemetry(telemetry) + + elif p["decoded"]["portnum"] == 'ROUTING_APP': + if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE': + if jsonResponse: + our_exit("{}") + else: + our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.") + return responseTelemetry + + @staticmethod + def printTelemetry(telemetry: telemetry_pb2.Telemetry): + print("Telemetry received:") + if telemetry.device_metrics.battery_level is not None: + print(f"Battery level: {telemetry.device_metrics.battery_level:.2f}%") + if telemetry.device_metrics.voltage is not None: + print(f"Voltage: {telemetry.device_metrics.voltage:.2f} V") + if telemetry.device_metrics.channel_utilization is not None: + print( + f"Total channel utilization: {telemetry.device_metrics.channel_utilization:.2f}%" + ) + if telemetry.device_metrics.air_util_tx is not None: + print(f"Transmit air utilization: {telemetry.device_metrics.air_util_tx:.2f}%") + + @staticmethod + def printJsonTelemetry(telemetry: telemetry_pb2.Telemetry, + destinationId: Union[int, str] = BROADCAST_ADDR): + json_output = { + "node_id": destinationId + } + if telemetry.device_metrics.battery_level is not None: + json_output["batteryLevel"] = telemetry.device_metrics.battery_level + if telemetry.device_metrics.voltage is not None: + json_output["voltage"] = telemetry.device_metrics.voltage + if telemetry.device_metrics.channel_utilization is not None: + json_output["channel_utilization"] = telemetry.device_metrics.channel_utilization + if telemetry.device_metrics.air_util_tx is not None: + json_output["air_util_tx"] = telemetry.device_metrics.air_util_tx + print(json.dumps(json_output, indent=2)) def _addResponseHandler(self, requestId: int, callback: Callable): self.responseHandlers[requestId] = ResponseHandler(callback) diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index dba635b4..27eb9c2b 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -408,15 +408,38 @@ def test_main_nodes(capsys): iface = MagicMock(autospec=SerialInterface) - def mock_showNodes(): - print("inside mocked showNodes") + def mock_showNodes(jsonResponse: bool = False): + print(f"inside mocked showNodes {jsonResponse}") + assert not jsonResponse iface.showNodes.side_effect = mock_showNodes with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo: main() out, err = capsys.readouterr() assert re.search(r"Connected to radio", out, re.MULTILINE) - assert re.search(r"inside mocked showNodes", out, re.MULTILINE) + assert re.search(r"inside mocked showNodes False", out, re.MULTILINE) + assert err == "" + mo.assert_called() + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_nodes_json(capsys): + """Test --nodes --json""" + sys.argv = ["", "--nodes", "--json"] + mt_config.args = sys.argv + + iface = MagicMock(autospec=SerialInterface) + + def mock_showNodes(jsonResponse: bool = False): + print(f"inside mocked showNodes {jsonResponse}") + assert jsonResponse + + iface.showNodes.side_effect = mock_showNodes + with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo: + main() + out, err = capsys.readouterr() + assert re.search(r"inside mocked showNodes True", out, re.MULTILINE) assert err == "" mo.assert_called()