31
31
import struct
32
32
import time
33
33
from random import randint
34
+
34
35
from micropython import const
36
+
35
37
from .matcher import MQTTMatcher
36
38
37
39
__version__ = "0.0.0+auto.0"
@@ -124,6 +126,19 @@ def wrap_socket(self, socket, server_hostname=None):
124
126
return _FakeSSLSocket (socket , self ._iface .TLS_MODE )
125
127
126
128
129
+ class NullLogger :
130
+ """Fake logger class that does not do anything"""
131
+
132
+ # pylint: disable=unused-argument
133
+ def nothing (self , msg : str , * args ) -> None :
134
+ """no action"""
135
+ pass
136
+
137
+ def __init__ (self ):
138
+ for log_level in ["debug" , "info" , "warning" , "error" , "critical" ]:
139
+ setattr (NullLogger , log_level , self .nothing )
140
+
141
+
127
142
class MQTT :
128
143
"""MQTT Client for CircuitPython.
129
144
@@ -186,7 +201,9 @@ def __init__(
186
201
self ._msg_size_lim = MQTT_MSG_SZ_LIM
187
202
self ._pid = 0
188
203
self ._timestamp = 0
189
- self .logger = None
204
+ self .logger = NullLogger ()
205
+ """An optional logging attribute that can be set with with a Logger
206
+ to enable debug logging."""
190
207
191
208
self ._reconnect_attempt = 0
192
209
self ._reconnect_timeout = float (0 )
@@ -270,9 +287,9 @@ def _get_connect_socket(self, host, port, *, timeout=1):
270
287
"ssl_context must be set before using adafruit_mqtt for secure MQTT."
271
288
)
272
289
273
- if self . logger is not None and port == MQTT_TLS_PORT :
290
+ if port == MQTT_TLS_PORT :
274
291
self .logger .info (f"Establishing a SECURE SSL connection to { host } :{ port } " )
275
- elif self . logger is not None :
292
+ else :
276
293
self .logger .info (f"Establishing an INSECURE connection to { host } :{ port } " )
277
294
278
295
addr_info = self ._socket_pool .getaddrinfo (
@@ -283,10 +300,9 @@ def _get_connect_socket(self, host, port, *, timeout=1):
283
300
sock = self ._socket_pool .socket (addr_info [0 ], addr_info [1 ])
284
301
except OSError as exc :
285
302
# Do not consider this for back-off.
286
- if self .logger is not None :
287
- self .logger .warning (
288
- f"Failed to create socket for host { addr_info [0 ]} and port { addr_info [1 ]} "
289
- )
303
+ self .logger .warning (
304
+ f"Failed to create socket for host { addr_info [0 ]} and port { addr_info [1 ]} "
305
+ )
290
306
raise TemporaryError from exc
291
307
292
308
connect_host = addr_info [- 1 ][0 ]
@@ -300,8 +316,7 @@ def _get_connect_socket(self, host, port, *, timeout=1):
300
316
sock .connect ((connect_host , port ))
301
317
except MemoryError as exc :
302
318
sock .close ()
303
- if self .logger is not None :
304
- self .logger .warning (f"Failed to allocate memory for connect: { exc } " )
319
+ self .logger .warning (f"Failed to allocate memory for connect: { exc } " )
305
320
# Do not consider this for back-off.
306
321
raise TemporaryError from exc
307
322
except OSError as exc :
@@ -352,8 +367,7 @@ def will_set(self, topic=None, payload=None, qos=0, retain=False):
352
367
:param bool retain: Specifies if the payload is to be retained when
353
368
it is published.
354
369
"""
355
- if self .logger is not None :
356
- self .logger .debug ("Setting last will properties" )
370
+ self .logger .debug ("Setting last will properties" )
357
371
self ._valid_qos (qos )
358
372
if self ._is_connected :
359
373
raise MMQTTException ("Last Will should only be called before connect()." )
@@ -447,10 +461,10 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
447
461
self ._recompute_reconnect_backoff ()
448
462
else :
449
463
self ._reset_reconnect_backoff ()
450
- if self . logger is not None :
451
- self .logger .debug (
452
- f"Attempting to connect to MQTT broker (attempt #{ self ._reconnect_attempt } )"
453
- )
464
+
465
+ self .logger .debug (
466
+ f"Attempting to connect to MQTT broker (attempt #{ self ._reconnect_attempt } )"
467
+ )
454
468
455
469
try :
456
470
ret = self ._connect (
@@ -462,18 +476,15 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
462
476
self ._reset_reconnect_backoff ()
463
477
return ret
464
478
except TemporaryError as e :
465
- if self .logger is not None :
466
- self .logger .warning (f"temporary error when connecting: { e } " )
479
+ self .logger .warning (f"temporary error when connecting: { e } " )
467
480
backoff = False
468
481
except OSError as e :
469
482
last_exception = e
470
- if self .logger is not None :
471
- self .logger .info (f"failed to connect: { e } " )
483
+ self .logger .info (f"failed to connect: { e } " )
472
484
backoff = True
473
485
except MMQTTException as e :
474
486
last_exception = e
475
- if self .logger is not None :
476
- self .logger .info (f"MMQT error: { e } " )
487
+ self .logger .info (f"MMQT error: { e } " )
477
488
backoff = True
478
489
479
490
if self ._reconnect_attempts_max > 1 :
@@ -502,14 +513,12 @@ def _connect(self, clean_session=True, host=None, port=None, keep_alive=None):
502
513
if keep_alive :
503
514
self .keep_alive = keep_alive
504
515
505
- if self .logger is not None :
506
- self .logger .debug ("Attempting to establish MQTT connection..." )
516
+ self .logger .debug ("Attempting to establish MQTT connection..." )
507
517
508
518
if self ._reconnect_attempt > 0 :
509
- if self .logger is not None :
510
- self .logger .debug (
511
- f"Sleeping for { self ._reconnect_timeout :.3} seconds due to connect back-off"
512
- )
519
+ self .logger .debug (
520
+ f"Sleeping for { self ._reconnect_timeout :.3} seconds due to connect back-off"
521
+ )
513
522
time .sleep (self ._reconnect_timeout )
514
523
515
524
# Get a new socket
@@ -565,11 +574,9 @@ def _connect(self, clean_session=True, host=None, port=None, keep_alive=None):
565
574
fixed_header .append (remaining_length )
566
575
fixed_header .append (0x00 )
567
576
568
- if self .logger is not None :
569
- self .logger .debug ("Sending CONNECT packet to broker..." )
570
- self .logger .debug (
571
- "Fixed Header: %s\n Variable Header: %s" , fixed_header , var_header
572
- )
577
+ self .logger .debug ("Sending CONNECT to broker..." )
578
+ self .logger .debug (f"Fixed Header: { fixed_header } " )
579
+ self .logger .debug (f"Variable Header: { var_header } " )
573
580
self ._sock .send (fixed_header )
574
581
self ._sock .send (var_header )
575
582
# [MQTT-3.1.3-4]
@@ -581,8 +588,7 @@ def _connect(self, clean_session=True, host=None, port=None, keep_alive=None):
581
588
if self ._username is not None :
582
589
self ._send_str (self ._username )
583
590
self ._send_str (self ._password )
584
- if self .logger is not None :
585
- self .logger .debug ("Receiving CONNACK packet from broker" )
591
+ self .logger .debug ("Receiving CONNACK packet from broker" )
586
592
stamp = time .monotonic ()
587
593
while True :
588
594
op = self ._wait_for_msg ()
@@ -607,15 +613,12 @@ def _connect(self, clean_session=True, host=None, port=None, keep_alive=None):
607
613
def disconnect (self ):
608
614
"""Disconnects the MiniMQTT client from the MQTT broker."""
609
615
self ._connected ()
610
- if self .logger is not None :
611
- self .logger .debug ("Sending DISCONNECT packet to broker" )
616
+ self .logger .debug ("Sending DISCONNECT packet to broker" )
612
617
try :
613
618
self ._sock .send (MQTT_DISCONNECT )
614
619
except RuntimeError as e :
615
- if self .logger is not None :
616
- self .logger .warning (f"Unable to send DISCONNECT packet: { e } " )
617
- if self .logger is not None :
618
- self .logger .debug ("Closing socket" )
620
+ self .logger .warning (f"Unable to send DISCONNECT packet: { e } " )
621
+ self .logger .debug ("Closing socket" )
619
622
self ._sock .close ()
620
623
self ._is_connected = False
621
624
self ._subscribed_topics = []
@@ -628,8 +631,7 @@ def ping(self):
628
631
Returns response codes of any messages received while waiting for PINGRESP.
629
632
"""
630
633
self ._connected ()
631
- if self .logger is not None :
632
- self .logger .debug ("Sending PINGREQ" )
634
+ self .logger .debug ("Sending PINGREQ" )
633
635
self ._sock .send (MQTT_PINGREQ )
634
636
ping_timeout = self .keep_alive
635
637
stamp = time .monotonic ()
@@ -699,15 +701,14 @@ def publish(self, topic, msg, retain=False, qos=0):
699
701
else :
700
702
pub_hdr_fixed .append (remaining_length )
701
703
702
- if self .logger is not None :
703
- self .logger .debug (
704
- "Sending PUBLISH\n Topic: %s\n Msg: %s\
705
- \n QoS: %d\n Retain? %r" ,
706
- topic ,
707
- msg ,
708
- qos ,
709
- retain ,
710
- )
704
+ self .logger .debug (
705
+ "Sending PUBLISH\n Topic: %s\n Msg: %s\
706
+ \n QoS: %d\n Retain? %r" ,
707
+ topic ,
708
+ msg ,
709
+ qos ,
710
+ retain ,
711
+ )
711
712
self ._sock .send (pub_hdr_fixed )
712
713
self ._sock .send (pub_hdr_var )
713
714
self ._sock .send (msg )
@@ -777,9 +778,8 @@ def subscribe(self, topic, qos=0):
777
778
topic_size = len (t .encode ("utf-8" )).to_bytes (2 , "big" )
778
779
qos_byte = q .to_bytes (1 , "big" )
779
780
packet += topic_size + t .encode () + qos_byte
780
- if self .logger is not None :
781
- for t , q in topics :
782
- self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
781
+ for t , q in topics :
782
+ self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
783
783
self ._sock .send (packet )
784
784
stamp = time .monotonic ()
785
785
while True :
@@ -831,12 +831,10 @@ def unsubscribe(self, topic):
831
831
for t in topics :
832
832
topic_size = len (t .encode ("utf-8" )).to_bytes (2 , "big" )
833
833
packet += topic_size + t .encode ()
834
- if self .logger is not None :
835
- for t in topics :
836
- self .logger .debug ("UNSUBSCRIBING from topic %s" , t )
834
+ for t in topics :
835
+ self .logger .debug ("UNSUBSCRIBING from topic %s" , t )
837
836
self ._sock .send (packet )
838
- if self .logger is not None :
839
- self .logger .debug ("Waiting for UNSUBACK..." )
837
+ self .logger .debug ("Waiting for UNSUBACK..." )
840
838
while True :
841
839
stamp = time .monotonic ()
842
840
op = self ._wait_for_msg ()
@@ -865,38 +863,34 @@ def _recompute_reconnect_backoff(self):
865
863
"""
866
864
self ._reconnect_attempt = self ._reconnect_attempt + 1
867
865
self ._reconnect_timeout = 2 ** self ._reconnect_attempt
868
- if self .logger is not None :
869
- # pylint: disable=consider-using-f-string
870
- self .logger .debug (
871
- "Reconnect timeout computed to {:.2f}" .format (self ._reconnect_timeout )
872
- )
866
+ # pylint: disable=consider-using-f-string
867
+ self .logger .debug (
868
+ "Reconnect timeout computed to {:.2f}" .format (self ._reconnect_timeout )
869
+ )
873
870
874
871
if self ._reconnect_timeout > self ._reconnect_maximum_backoff :
875
- if self .logger is not None :
876
- self .logger .debug (
877
- f"Truncating reconnect timeout to { self ._reconnect_maximum_backoff } seconds"
878
- )
872
+ self .logger .debug (
873
+ f"Truncating reconnect timeout to { self ._reconnect_maximum_backoff } seconds"
874
+ )
879
875
self ._reconnect_timeout = float (self ._reconnect_maximum_backoff )
880
876
881
877
# Add a sub-second jitter.
882
878
# Even truncated timeout should have jitter added to it. This is why it is added here.
883
879
jitter = randint (0 , 1000 ) / 1000
884
- if self .logger is not None :
885
- # pylint: disable=consider-using-f-string
886
- self .logger .debug (
887
- "adding jitter {:.2f} to {:.2f} seconds" .format (
888
- jitter , self ._reconnect_timeout
889
- )
880
+ # pylint: disable=consider-using-f-string
881
+ self .logger .debug (
882
+ "adding jitter {:.2f} to {:.2f} seconds" .format (
883
+ jitter , self ._reconnect_timeout
890
884
)
885
+ )
891
886
self ._reconnect_timeout += jitter
892
887
893
888
def _reset_reconnect_backoff (self ):
894
889
"""
895
890
Reset reconnect back-off to the initial state.
896
891
897
892
"""
898
- if self .logger is not None :
899
- self .logger .debug ("Resetting reconnect backoff" )
893
+ self .logger .debug ("Resetting reconnect backoff" )
900
894
self ._reconnect_attempt = 0
901
895
self ._reconnect_timeout = float (0 )
902
896
@@ -909,17 +903,13 @@ def reconnect(self, resub_topics=True):
909
903
910
904
"""
911
905
912
- if self .logger is not None :
913
- self .logger .debug ("Attempting to reconnect with MQTT broker" )
914
-
906
+ self .logger .debug ("Attempting to reconnect with MQTT broker" )
915
907
ret = self .connect ()
916
- if self .logger is not None :
917
- self .logger .debug ("Reconnected with broker" )
908
+ self .logger .debug ("Reconnected with broker" )
918
909
if resub_topics :
919
- if self .logger is not None :
920
- self .logger .debug (
921
- "Attempting to resubscribe to previously subscribed topics."
922
- )
910
+ self .logger .debug (
911
+ "Attempting to resubscribe to previously subscribed topics."
912
+ )
923
913
subscribed_topics = self ._subscribed_topics .copy ()
924
914
self ._subscribed_topics = []
925
915
while subscribed_topics :
@@ -938,16 +928,16 @@ def loop(self, timeout=0):
938
928
939
929
"""
940
930
931
+ self .logger .debug (f"waiting for messages for { timeout } seconds" )
941
932
if self ._timestamp == 0 :
942
933
self ._timestamp = time .monotonic ()
943
934
current_time = time .monotonic ()
944
935
if current_time - self ._timestamp >= self .keep_alive :
945
936
self ._timestamp = 0
946
937
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
947
- if self .logger is not None :
948
- self .logger .debug (
949
- "KeepAlive period elapsed - requesting a PINGRESP from the server..."
950
- )
938
+ self .logger .debug (
939
+ "KeepAlive period elapsed - requesting a PINGRESP from the server..."
940
+ )
951
941
rcs = self .ping ()
952
942
return rcs
953
943
@@ -960,10 +950,9 @@ def loop(self, timeout=0):
960
950
if rc is None :
961
951
break
962
952
if time .monotonic () - stamp > self ._recv_timeout :
963
- if self .logger is not None :
964
- self .logger .debug (
965
- f"Loop timed out, message queue not empty after { self ._recv_timeout } s"
966
- )
953
+ self .logger .debug (
954
+ f"Loop timed out, message queue not empty after { self ._recv_timeout } s"
955
+ )
967
956
break
968
957
rcs .append (rc )
969
958
@@ -996,8 +985,7 @@ def _wait_for_msg(self, timeout=0.1):
996
985
# If we get here, it means that there is nothing to be received
997
986
return None
998
987
if res [0 ] & MQTT_PKT_TYPE_MASK == MQTT_PINGRESP :
999
- if self .logger is not None :
1000
- self .logger .debug ("Got PINGRESP" )
988
+ self .logger .debug ("Got PINGRESP" )
1001
989
sz = self ._sock_exact_recv (1 )[0 ]
1002
990
if sz != 0x00 :
1003
991
raise MMQTTException (f"Unexpected PINGRESP returned from broker: { sz } ." )
@@ -1029,10 +1017,7 @@ def _wait_for_msg(self, timeout=0.1):
1029
1017
# read message contents
1030
1018
raw_msg = self ._sock_exact_recv (sz )
1031
1019
msg = raw_msg if self ._use_binary_mode else str (raw_msg , "utf-8" )
1032
- if self .logger is not None :
1033
- self .logger .debug (
1034
- "Receiving PUBLISH \n Topic: %s\n Msg: %s\n " , topic , raw_msg
1035
- )
1020
+ self .logger .debug ("Receiving PUBLISH \n Topic: %s\n Msg: %s\n " , topic , raw_msg )
1036
1021
self ._handle_on_message (self , topic , msg )
1037
1022
if res [0 ] & 0x06 == 0x02 :
1038
1023
pkt = bytearray (b"\x40 \x02 \0 \0 " )
@@ -1101,8 +1086,7 @@ def _sock_exact_recv(self, bufsize):
1101
1086
# This will timeout with socket timeout (not keepalive timeout)
1102
1087
rc = self ._sock .recv (bufsize )
1103
1088
if not rc :
1104
- if self .logger is not None :
1105
- self .logger .debug ("_sock_exact_recv timeout" )
1089
+ self .logger .debug ("_sock_exact_recv timeout" )
1106
1090
# If no bytes waiting, raise same exception as socketpool
1107
1091
raise OSError (errno .ETIMEDOUT )
1108
1092
# If any bytes waiting, try to read them all,
@@ -1187,13 +1171,12 @@ def enable_logger(self, log_pkg, log_level=20, logger_name="log"):
1187
1171
:return logger object
1188
1172
1189
1173
"""
1174
+ # pylint: disable=attribute-defined-outside-init
1190
1175
self .logger = log_pkg .getLogger (logger_name )
1191
1176
self .logger .setLevel (log_level )
1192
1177
1193
1178
return self .logger
1194
1179
1195
1180
def disable_logger (self ):
1196
1181
"""Disables logging."""
1197
- if not self .logger :
1198
- raise MMQTTException ("Can not disable logger, no logger found." )
1199
- self .logger = None
1182
+ self .logger = NullLogger ()
0 commit comments