Skip to content
This repository was archived by the owner on Aug 2, 2020. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 157 additions & 77 deletions esp32/modules_disobey2019/ir.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,164 @@
from machine import Pin, PWM
from time import sleep_ms
import gc
import machine
import time

optics_sleep = 50
# Basics for receiving:
# * Startup by enabling the IR receiver to eat power
# * Enable an interrupt (hardware) for receiving the initial pulse
# * When the interrupt is fired disable the rest of the interrups for the duration of this read
# * Read the pulses with time_pulse_us (it's extremely fast)
# * Store this in a buffer with a fixed size, (256 empty arrays)
# * Set a timer to decode the data instantly after interrupts are enabled again.
# * a class can inherit this class and just define a decoder and specify a send pattern
# * if a send pattern is not the way to go (some totally different protocols) you can just implement another tx function

pin_rx_enable = Pin(21, Pin.OUT)
pin_rx = Pin(18, Pin.IN)
pin_tx = Pin(19, Pin.OUT)
pwm_tx = PWM(pin_tx, freq=38000, duty=0)
class BadgeIr():
freq = 38000
ticks = 500
start = 4000
rxpin = 18
txpin = 19
rxenablepin = 21
rxtimer = None
readcallback = None
bitform = []
pwm_tx = None

# Functions for switching power to the receiver on and off
def initbuffer(self):
self.buffer=[[]] * 256
self.bufpos = 0
# If it returns 1 the buffer gets emptied out and initialized.
# This is the default, protocol implementations should choose if they want to.
def decoder(self):
return 1
def real_decoder(self,timer):
self.rxtimer.deinit()
self.rxtimer = None
if self.decoder():
self.initbuffer()
def cleanbuffer(self,i):
self.buffer=self.buffer[i:256]+[[]]*i
self.bufpos-=i
gc.collect()
def mr(self):
if self.bufpos == len(self.buffer):
return 0
waitfor=0 if self.pin_rx.value() else 1
while True:
t=machine.time_pulse_us(self.pin_rx,waitfor,50*1000)
if t<0:
if t == -2:
return 1
return 0
elif t>0:
waitfor = 0 if waitfor else 1
self.buffer[self.bufpos]=[waitfor,round(t/self.ticks)]
self.bufpos+=1
if self.bufpos == len(self.buffer):
return 1
def callback(self,pin):
irqs = machine.disable_irq()
hasdata = self.mr()
machine.enable_irq(irqs)
if hasdata and not self.rxtimer:
self.rxtimer = machine.Timer(1)
self.rxtimer.init(mode=machine.Timer.ONE_SHOT, period=1,callback=self.real_decoder)
def rx_enable(self):
self.pin_rx_enable = machine.Pin(self.rxenablepin, machine.Pin.OUT)
self.pin_rx = machine.Pin(self.rxpin, machine.Pin.IN)
self.initbuffer()
self.pin_rx_enable.value(True)
self.pin_rx.irq(trigger=machine.Pin.IRQ_FALLING, handler=self.callback)
def rx_disable(self):
self.pin_rx.irq(trigger=0, handler=self.callback)
self.pin_rx_enable.value(False)
def tx_enable(self):
if not self.pwm_tx:
self.pin_tx = machine.Pin(self.txpin, machine.Pin.OUT)
self.pwm_tx = machine.PWM(self.pin_tx, freq = self.freq, duty = 0)
def tx_disable(self):
self.pwm_tx.duty(0)
def tx_setduty(self,duty):
self.pwm_tx.duty(512 * duty)
def txBit(self,bit):
for (onoff,tijd) in self.bitform[bit]:
self.tx_setduty(onoff)
time.sleep_us(tijd)
def txByte(self,byte):
for bit in range(8):
self.txBit( ( byte >> ( 7 - bit ) ) & 1 ) # MSB

def enableRx(state=True):
pin_rx_enable.value(state)

def disableRx():
pin_rx_enable.value(False)
class NecIR(BadgeIr):
# Implements NEC Infrared
# Example:
# IR=NecIR()
# NecIR.command= <function (address,command)>
# NecIR.repeat= <function ()>
# NecIR.rx_enable()
# To stop receiving:
# NecIR.rx_disable()
# To send:
# NecIR.tx(<byte address>,<byte command>)
# NecIR.tx_repeat()
command = None
repeat = None
bitform = { 0: [[1,562],[0,562]], 1: [[1,562],[0,1687]], 's': [[1,9000],[0,4500]], 'e': [[1,562],[0,100]], 'r': [[1,9000],[0,2500],[1,562],[0,100]] }

# Functions for directly talking to the receiver and LED
def tx(self,addr,cmd):
self.tx_enable()
self.txBit('s')
self.txByte(addr)
self.txByte(addr ^ 0xFF)
self.txByte(cmd)
self.txByte(cmd ^ 0xFF)
self.txBit('e')
self.tx_disable()

def rawTx(p):
pwm_tx.duty(512*p)

def rawRx():
return not pin_rx.value()
def tx_repeat(self):
self.txBit('r')

# Helper functions for the (proof of concept) text transmission feature

def _txByte(byte):
for bit in range(8):
rawTx((byte>>(7-bit))&1)
sleep_ms(optics_sleep)

buf = 0

def _rxBit():
global buf
buf = ((buf<<1) + rawRx()*1)&0xFFFF
sleep_ms(optics_sleep)

def _rxByte():
global buf
for i in range(8):
_rxBit()
return buf & 0xFF

def _rxWait(timeout=100):
global buf
cnt = 0
while True:
_rxBit()
if buf&0xFFFF == 0b1110101010101011:
buf = 0
return True
#rxDebug()
cnt+=1
if cnt > timeout:
return False

# Functions for text transmission (proof of concept)

def tx(data):
rawTx(False)
sleep_ms(optics_sleep*8)
_txByte(0b11101010)
_txByte(0b10101011)
for char in data:
txByte(ord(char))
rawTx(False)

def rx(timeout=100):
pin_rx_enable.value(True)
global buf
string = ""
buf = 0
if not _rxWait(timeout):
pin_rx_enable.value(False)
return None
while True:
b = _rxByte()
if b < 1:
pin_rx_enable.value(False)
return string
string += chr(b)
def decoder(self):
decoded=0
i=0
while True and self.bufpos-i>0:
(val,time)=self.buffer[i]
i+=1
if val==0 and time==9:
if self.bufpos<66: return(0) # Not yet complete....
p1=None
p2=None
bits=0
while True and self.bufpos-i>0:
(val,time)=self.buffer[i]
i+=1
if time>0:
if p1==None:
p1=(val,time)
if bits==32 and p1[1]==1:
self.cleanbuffer(i)
if (decoded >> 24 & 0xFF) == (0xFF ^ (decoded >> 16 & 0xFF)) and (decoded >> 8 & 0xFF) == (0xFF ^ (decoded >> 0 & 0xFF)) and self.command:
self.command(decoded >> 24 & 0xFF,decoded >> 8 & 0xFF)
return(0)
else:
p2=(val,time)
if p1[1]==1 and p2[1]==3:
decoded=decoded<<1 | 1
bits+=1
elif p1[1]==1 and p2[1]==1:
decoded=decoded<<1
bits+=1
if bits==32 and p2==None:
self.cleanbuffer(i)
return(0)
p1=None
p2=None
elif time<0:
self.cleanbuffer(i)
return(0)
elif val==1 and time==18:
if self.buffer[i] == [0,4] and self.buffer[i+1] == [1,1]:
i+=2
if self.repeat: self.repeat()
return(0)
self.cleanbuffer(i)
return(0)