This commit is contained in:
joan 2014-11-25 17:10:05 +00:00
parent 6e8073871d
commit e8a8ce1982
26 changed files with 1712 additions and 2 deletions

View File

@ -1 +0,0 @@
pigpio C++ examples

View File

@ -0,0 +1,283 @@
#!/usr/bin/env python
# 2014-07-11 DHT22.py
import time
import atexit
import pigpio
class sensor:
"""
A class to read relative humidity and temperature from the
DHT22 sensor. The sensor is also known as the AM2302.
The sensor can be powered from the Pi 3V3 or the Pi 5V rail.
Powering from the 3V3 rail is simpler and safer. You may need
to power from 5V if the sensor is connected via a long cable.
For 3V3 operation connect pin 1 to 3V3 and pin 4 to ground.
Connect pin 2 to a gpio.
For 5V operation connect pin 1 to 5V and pin 4 to ground.
The following pin 2 connection works for me. Use at YOUR OWN RISK.
5V--5K_resistor--+--10K_resistor--Ground
|
DHT22 pin 2 -----+
|
gpio ------------+
"""
def __init__(self, pi, gpio, LED=None, power=None):
"""
Instantiate with the Pi and gpio to which the DHT22 output
pin is connected.
Optionally a LED may be specified. This will be blinked for
each successful reading.
Optionally a gpio used to power the sensor may be specified.
This gpio will be set high to power the sensor. If the sensor
locks it will be power cycled to restart the readings.
Taking readings more often than about once every two seconds will
eventually cause the DHT22 to hang. A 3 second interval seems OK.
"""
self.pi = pi
self.gpio = gpio
self.LED = LED
self.power = power
if power is not None:
pi.write(power, 1) # Switch sensor on.
time.sleep(2)
self.powered = True
self.cb = None
atexit.register(self.cancel)
self.bad_CS = 0 # Bad checksum count.
self.bad_SM = 0 # Short message count.
self.bad_MM = 0 # Missing message count.
self.bad_SR = 0 # Sensor reset count.
# Power cycle if timeout > MAX_TIMEOUTS.
self.no_response = 0
self.MAX_NO_RESPONSE = 2
self.rhum = -999
self.temp = -999
self.tov = None
self.high_tick = 0
self.bit = 40
pi.set_pull_up_down(gpio, pigpio.PUD_OFF)
pi.set_watchdog(gpio, 0) # Kill any watchdogs.
self.cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cb)
def _cb(self, gpio, level, tick):
"""
Accumulate the 40 data bits. Format into 5 bytes, humidity high,
humidity low, temperature high, temperature low, checksum.
"""
diff = pigpio.tickDiff(self.high_tick, tick)
if level == 0:
# Edge length determines if bit is 1 or 0.
if diff >= 50:
val = 1
if diff >= 200: # Bad bit?
self.CS = 256 # Force bad checksum.
else:
val = 0
if self.bit >= 40: # Message complete.
self.bit = 40
elif self.bit >= 32: # In checksum byte.
self.CS = (self.CS<<1) + val
if self.bit == 39:
# 40th bit received.
self.pi.set_watchdog(self.gpio, 0)
self.no_response = 0
total = self.hH + self.hL + self.tH + self.tL
if (total & 255) == self.CS: # Is checksum ok?
self.rhum = ((self.hH<<8) + self.hL) * 0.1
if self.tH & 128: # Negative temperature.
mult = -0.1
self.tH = self.tH & 127
else:
mult = 0.1
self.temp = ((self.tH<<8) + self.tL) * mult
self.tov = time.time()
if self.LED is not None:
self.pi.write(self.LED, 0)
else:
self.bad_CS += 1
elif self.bit >=24: # in temp low byte
self.tL = (self.tL<<1) + val
elif self.bit >=16: # in temp high byte
self.tH = (self.tH<<1) + val
elif self.bit >= 8: # in humidity low byte
self.hL = (self.hL<<1) + val
elif self.bit >= 0: # in humidity high byte
self.hH = (self.hH<<1) + val
else: # header bits
pass
self.bit += 1
elif level == 1:
self.high_tick = tick
if diff > 250000:
self.bit = -2
self.hH = 0
self.hL = 0
self.tH = 0
self.tL = 0
self.CS = 0
else: # level == pigpio.TIMEOUT:
self.pi.set_watchdog(self.gpio, 0)
if self.bit < 8: # Too few data bits received.
self.bad_MM += 1 # Bump missing message count.
self.no_response += 1
if self.no_response > self.MAX_NO_RESPONSE:
self.no_response = 0
self.bad_SR += 1 # Bump sensor reset count.
if self.power is not None:
self.powered = False
self.pi.write(self.power, 0)
time.sleep(2)
self.pi.write(self.power, 1)
time.sleep(2)
self.powered = True
elif self.bit < 39: # Short message receieved.
self.bad_SM += 1 # Bump short message count.
self.no_response = 0
else: # Full message received.
self.no_response = 0
def temperature(self):
"""Return current temperature."""
return self.temp
def humidity(self):
"""Return current relative humidity."""
return self.rhum
def staleness(self):
"""Return time since measurement made."""
if self.tov is not None:
return time.time() - self.tov
else:
return -999
def bad_checksum(self):
"""Return count of messages received with bad checksums."""
return self.bad_CS
def short_message(self):
"""Return count of short messages."""
return self.bad_SM
def missing_message(self):
"""Return count of missing messages."""
return self.bad_MM
def sensor_resets(self):
"""Return count of power cycles because of sensor hangs."""
return self.bad_SR
def trigger(self):
"""Trigger a new relative humidity and temperature reading."""
if self.powered:
if self.LED is not None:
self.pi.write(self.LED, 1)
self.pi.write(self.gpio, pigpio.LOW)
time.sleep(0.017) # 17 ms
self.pi.set_mode(self.gpio, pigpio.INPUT)
self.pi.set_watchdog(self.gpio, 200)
def cancel(self):
"""Cancel the DHT22 sensor."""
self.pi.set_watchdog(self.gpio, 0)
if self.cb != None:
self.cb.cancel()
self.cb = None
if __name__ == "__main__":
import time
import pigpio
import DHT22
# Intervals of about 2 seconds or less will eventually hang the DHT22.
INTERVAL=3
pi = pigpio.pi()
s = DHT22.sensor(pi, 22, LED=16, power=8)
r = 0
next_reading = time.time()
while True:
r += 1
s.trigger()
time.sleep(0.2)
print("{} {} {} {:3.2f} {} {} {} {}".format(
r, s.humidity(), s.temperature(), s.staleness(),
s.bad_checksum(), s.short_message(), s.missing_message(),
s.sensor_resets()))
next_reading += INTERVAL
time.sleep(next_reading-time.time()) # Overall INTERVAL second polling.
s.cancel()
pi.stop()

View File

@ -0,0 +1,2 @@
Class to read the relative humidity and temperature from a DHT22/AM2302 sensor.

View File

@ -0,0 +1,2 @@
Script to display the status of gpios 0-31.

View File

@ -0,0 +1,62 @@
#!/usr/bin/env python
import time
import curses
import atexit
import pigpio
GPIOS=32
MODES=["INPUT", "OUTPUT", "ALT5", "ALT4", "ALT0", "ALT1", "ALT2", "ALT3"]
def cleanup():
curses.nocbreak()
curses.echo()
curses.endwin()
pi.stop()
pi = pigpio.pi()
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
atexit.register(cleanup)
cb = []
for g in range(GPIOS):
cb.append(pi.callback(g, pigpio.EITHER_EDGE))
# disable gpio 28 as the PCM clock is swamping the system
cb[28].cancel()
stdscr.nodelay(1)
stdscr.addstr(0, 23, "Status of gpios 0-31", curses.A_REVERSE)
while True:
for g in range(GPIOS):
tally = cb[g].tally()
mode = pi.get_mode(g)
col = (g / 11) * 25
row = (g % 11) + 2
stdscr.addstr(row, col, "{:2}".format(g), curses.A_BOLD)
stdscr.addstr(
"={} {:>6}: {:<10}".format(pi.read(g), MODES[mode], tally))
stdscr.refresh()
time.sleep(0.1)
c = stdscr.getch()
if c != curses.ERR:
break

View File

@ -0,0 +1,2 @@
Program to show status changes for a Hall effect sensor.

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python
import time
import pigpio
#
# OH3144E or equivalent Hall effect sensor
#
# Pin 1 - 5V
# Pin 2 - Ground
# Pin 3 - gpio (here P1-8, gpio 14, TXD is used)
#
# The internal gpio pull-up is enabled so that the sensor
# normally reads high. It reads low when a magnet is close.
#
HALL=14
pi = pigpio.pi() # connect to local Pi
pi.set_mode(HALL, pigpio.INPUT)
pi.set_pull_up_down(HALL, pigpio.PUD_UP)
start = time.time()
while (time.time() - start) < 60:
print("Hall = {}".format(pi.read(HALL)))
time.sleep(0.2)
pi.stop()

View File

@ -0,0 +1,163 @@
#!/usr/bin/env python
import time
import pigpio
class sniffer:
"""
A class to passively monitor activity on an I2C bus. This should
work for an I2C bus running at 100kbps or less. You are unlikely
to get any usable results for a bus running any faster.
"""
def __init__(self, pi, SCL, SDA, set_as_inputs=True):
"""
Instantiate with the Pi and the gpios for the I2C clock
and data lines.
If you are monitoring one of the Raspberry Pi buses you
must set set_as_inputs to False so that they remain in
I2C mode.
The pigpio daemon should have been started with a higher
than default sample rate.
For an I2C bus rate of 100Kbps sudo pigpiod -s 2 should work.
A message is printed for each I2C transaction formatted with
"[" for the START
"XX" two hex characters for each data byte
"+" if the data is ACKd, "-" if the data is NACKd
"]" for the STOP
E.g. Reading the X, Y, Z values from an ADXL345 gives:
[A6+32+]
[A7+01+FF+F2+FF+06+00-]
"""
self.pi = pi
self.gSCL = SCL
self.gSDA = SDA
self.FALLING = 0
self.RISING = 1
self.STEADY = 2
self.in_data = False
self.byte = 0
self.bit = 0
self.oldSCL = 1
self.oldSDA = 1
self.transact = ""
if set_as_inputs:
self.pi.set_mode(SCL, pigpio.INPUT)
self.pi.set_mode(SDA, pigpio.INPUT)
self.cbA = self.pi.callback(SCL, pigpio.EITHER_EDGE, self._cb)
self.cbB = self.pi.callback(SDA, pigpio.EITHER_EDGE, self._cb)
def _parse(self, SCL, SDA):
"""
Accumulate all the data between START and STOP conditions
into a string and output when STOP is detected.
"""
if SCL != self.oldSCL:
self.oldSCL = SCL
if SCL:
xSCL = self.RISING
else:
xSCL = self.FALLING
else:
xSCL = self.STEADY
if SDA != self.oldSDA:
self.oldSDA = SDA
if SDA:
xSDA = self.RISING
else:
xSDA = self.FALLING
else:
xSDA = self.STEADY
if xSCL == self.RISING:
if self.in_data:
if self.bit < 8:
self.byte = (self.byte << 1) | SDA
self.bit += 1
else:
self.transact += '{:02X}'.format(self.byte)
if SDA:
self.transact += '-'
else:
self.transact += '+'
self.bit = 0
self.byte = 0
elif xSCL == self.STEADY:
if xSDA == self.RISING:
if SCL:
self.in_data = False
self.byte = 0
self.bit = 0
self.transact += ']' # STOP
print (self.transact)
self.transact = ""
if xSDA == self.FALLING:
if SCL:
self.in_data = True
self.byte = 0
self.bit = 0
self.transact += '[' # START
def _cb(self, gpio, level, tick):
"""
Check which line has altered state (ignoring watchdogs) and
call the parser with the new state.
"""
SCL = self.oldSCL
SDA = self.oldSDA
if gpio == self.gSCL:
if level == 0:
SCL = 0
elif level == 1:
SCL = 1
if gpio == self.gSDA:
if level == 0:
SDA = 0
elif level == 1:
SDA = 1
self._parse(SCL, SDA)
def cancel(self):
"""Cancel the I2C callbacks."""
self.cbA.cancel()
self.cbB.cancel()
if __name__ == "__main__":
import time
import pigpio
import I2C_sniffer
pi = pigpio.pi()
s = I2C_sniffer.sniffer(pi, 1, 0, False) # leave gpios 1/0 in I2C mode
time.sleep(60000)
s.cancel()
pi.stop()

View File

@ -0,0 +1,2 @@
A program to passively sniff I2C transactions (100kHz bus maximum) and display the results.

View File

@ -0,0 +1,2 @@
Class to hash a code from an IR receiver (reading an IR remote control).

View File

@ -0,0 +1,165 @@
#!/usr/bin/env python
import pigpio
class hasher:
"""
This class forms a hash over the IR pulses generated by an
IR remote.
The remote key press is not converted into a code in the manner of
the lirc module. No attempt is made to decode the type of protocol
used by the remote. The hash is likely to be unique for different
keys and different remotes but this is not guaranteed.
This hashing process works for some remotes/protocols but not for
others. The only way to find out if it works for one or more of
your remotes is to try it and see.
EXAMPLE CODE
#!/usr/bin/env python
import time
import pigpio
import ir_hasher
def callback(hash):
print("hash={}".format(hash));
pi = pigpio.pi()
ir = ir_hasher.hasher(pi, 7, callback, 5)
print("ctrl c to exit");
time.sleep(300)
pi.stop()
"""
def __init__(self, pi, gpio, callback, timeout=5):
"""
Initialises an IR remote hasher on a pi's gpio. A gap of timeout
milliseconds indicates the end of the remote key press.
"""
self.pi = pi
self.gpio = gpio
self.code_timeout = timeout
self.callback = callback
self.in_code = False
pi.set_mode(gpio, pigpio.INPUT)
self.cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cb)
def _hash(self, old_val, new_val):
if new_val < (old_val * 0.60):
val = 13
elif old_val < (new_val * 0.60):
val = 23
else:
val = 2
self.hash_val = self.hash_val ^ val
self.hash_val *= 16777619 # FNV_PRIME_32
self.hash_val = self.hash_val & ((1<<32)-1)
def _cb(self, gpio, level, tick):
if level != pigpio.TIMEOUT:
if self.in_code == False:
self.in_code = True
self.pi.set_watchdog(self.gpio, self.code_timeout)
self.hash_val = 2166136261 # FNV_BASIS_32
self.edges = 1
self.t1 = None
self.t2 = None
self.t3 = None
self.t4 = tick
else:
self.edges += 1
self.t1 = self.t2
self.t2 = self.t3
self.t3 = self.t4
self.t4 = tick
if self.t1 is not None:
d1 = pigpio.tickDiff(self.t1,self.t2)
d2 = pigpio.tickDiff(self.t3,self.t4)
self._hash(d1, d2)
else:
if self.in_code:
self.in_code = False
self.pi.set_watchdog(self.gpio, 0)
if self.edges > 12:
self.callback(self.hash_val)
if __name__ == "__main__":
import time
import pigpio
import ir_hasher
hashes = {
142650387: '2', 244341844: 'menu', 262513468: 'vol-',
272048826: '5', 345069212: '6', 363685443: 'prev.ch',
434191356: '1', 492745084: 'OK', 549497027: 'mute',
603729091: 'text', 646476378: 'chan-', 832916949: 'home',
923778138: 'power', 938165610: 'power', 953243510: 'forward',
1009731980:'1', 1018231875:'TV', 1142888517:'c-up',
1151589683:'chan+', 1344018636:'OK', 1348032067:'chan+',
1367109971:'prev.ch', 1370712102:'c-left', 1438405361:'rewind',
1452589043:'pause', 1518578730:'chan-', 1554432645:'8',
1583569525:'0', 1629745313:'rewind', 1666513749:'record',
1677653754:'c-down', 1825951717:'c-right', 1852412236:'6',
1894279468:'9', 1904895749:'vol+', 1941947509:'ff',
2076573637:'0', 2104823531:'back', 2141641957:'home',
2160787557:'record', 2398525299:'7', 2468117013:'8',
2476712746:'play', 2574308838:'forward', 2577952149:'4',
2706654902:'stop', 2829002741:'c-up', 2956097083:'back',
3112717386:'5', 3263244773:'ff', 3286088195:'pause',
3363767978:'c-down', 3468076364:'vol-', 3491068358:'stop',
3593710134:'c-left', 3708232515:'3', 3734134565:'back',
3766109107:'TV', 3798010010:'play', 3869937700:'menu',
3872715523:'7', 3885097091:'2', 3895301587:'text',
3931058739:'mute', 3983900853:'c-right', 4032250885:'4',
4041913909:'vol+', 4207017660:'9', 4227138677:'back',
4294027955:'3'}
def callback(hash):
if hash in hashes:
print("key={} hash={}".format(hashes[hash], hash));
pi = pigpio.pi()
ir = ir_hasher.hasher(pi, 7, callback, 5)
print("ctrl c to exit");
time.sleep(300)
pi.stop()

View File

@ -0,0 +1,2 @@
Script to transmit the morse code corresponding to a text string.

View File

@ -0,0 +1,72 @@
#!/usr/bin/env python
import pigpio
morse={
'a':'.-' , 'b':'-...' , 'c':'-.-.' , 'd':'-..' , 'e':'.' ,
'f':'..-.' , 'g':'--.' , 'h':'....' , 'i':'..' , 'j':'.---' ,
'k':'-.-' , 'l':'.-..' , 'm':'--' , 'n':'-.' , 'o':'---' ,
'p':'.--.' , 'q':'--.-' , 'r':'.-.' , 's':'...' , 't':'-' ,
'u':'..-' , 'v':'...-' , 'w':'.--' , 'x':'-..-' , 'y':'-.--' ,
'z':'--..' , '1':'.----', '2':'..---', '3':'...--', '4':'....-',
'5':'.....', '6':'-....', '7':'--...', '8':'---..', '9':'----.',
'0':'-----'}
GPIO=22
MICROS=100000
NONE=0
DASH=3
DOT=1
GAP=1
LETTER_GAP=3-GAP
WORD_GAP=7-LETTER_GAP
def transmit_string(pi, gpio, str):
pi.wave_clear() # start a new waveform
wf=[]
for C in str:
c=C.lower()
print(c)
if c in morse:
k = morse[c]
for x in k:
if x == '.':
wf.append(pigpio.pulse(1<<gpio, NONE, DOT * MICROS))
else:
wf.append(pigpio.pulse(1<<gpio, NONE, DASH * MICROS))
wf.append(pigpio.pulse(NONE, 1<<gpio, GAP * MICROS))
wf.append(pigpio.pulse(NONE, 1<<gpio, LETTER_GAP * MICROS))
elif c == ' ':
wf.append(pigpio.pulse(NONE, 1<<gpio, WORD_GAP * MICROS))
pi.wave_add_generic(wf)
pi.wave_tx_start()
pi = pigpio.pi()
pi.set_mode(GPIO, pigpio.OUTPUT)
transmit_string(pi, GPIO, "Now is the winter of our discontent")
while pi.wave_tx_busy():
pass
transmit_string(pi, GPIO, "made glorious summer by this sun of York")
while pi.wave_tx_busy():
pass
pi.stop()

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python
# 2014-08-26 PCF8591.py
import time
import curses
import pigpio
# sudo pigpiod
# ./PCF8591.py
# Connect Pi 5V - VCC, Ground - Ground, SDA - SDA, SCL - SCL.
YL_40=0x48
pi = pigpio.pi() # Connect to local Pi.
handle = pi.i2c_open(1, YL_40, 0)
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
aout = 0
stdscr.addstr(10, 0, "Brightness")
stdscr.addstr(12, 0, "Temperature")
stdscr.addstr(14, 0, "AOUT->AIN2")
stdscr.addstr(16, 0, "Resistor")
stdscr.nodelay(1)
try:
while True:
for a in range(0,4):
aout = aout + 1
pi.i2c_write_byte_data(handle, 0x40 | ((a+1) & 0x03), aout&0xFF)
v = pi.i2c_read_byte(handle)
hashes = v / 4
spaces = 64 - hashes
stdscr.addstr(10+a*2, 12, str(v) + ' ')
stdscr.addstr(10+a*2, 16, '#' * hashes + ' ' * spaces )
stdscr.refresh()
time.sleep(0.04)
c = stdscr.getch()
if c != curses.ERR:
break
except:
pass
curses.nocbreak()
curses.echo()
curses.endwin()
pi.i2c_close(handle)
pi.stop()

View File

@ -0,0 +1,2 @@
Script to display readings from the (I2C) PCF8591.

View File

@ -0,0 +1,2 @@
Script to benchmark the pigpio Python module's performance.

View File

@ -0,0 +1,91 @@
#!/usr/bin/env python
#
# WARNING!
#
##############################################################################
# #
# Unless you know what you are doing don't run this script with anything #
# connected to the gpios. You will CAUSE damage. #
# #
##############################################################################
#
# WARNING!
#
import time
import pigpio
delay = 30
class gpioTest:
def __init__(self, pi, gpio, edge, freq, duty):
self.pi = pi
self.gpio = gpio
self.edge = edge
self.freq = freq
self.duty = duty
self.calls = 0
def cb(self, g, t, l):
self.calls = self.calls + 1
# print g,t,l
def num(self):
return self.calls
def start(self):
self.pi.set_PWM_frequency(self.gpio, self.freq)
self.pi.set_PWM_range(self.gpio, 25)
self.pi.set_PWM_dutycycle(self.gpio, self.duty)
self.n = self.pi.callback(self.gpio, self.edge, self.cb)
def stop(self):
self.pi.set_PWM_dutycycle(self.gpio, 0)
self.n.cancel()
pi = pigpio.pi()
t1 = gpioTest(pi, 4, pigpio.EITHER_EDGE, 4000, 1)
t2 = gpioTest(pi, 7, pigpio.RISING_EDGE, 8000, 2)
t3 = gpioTest(pi, 8, pigpio.FALLING_EDGE, 8000, 3)
t4 = gpioTest(pi, 9, pigpio.EITHER_EDGE, 4000, 4)
t5 = gpioTest(pi,10, pigpio.RISING_EDGE, 8000, 5)
t6 = gpioTest(pi,11, pigpio.FALLING_EDGE, 8000, 6)
t7 = gpioTest(pi,14, pigpio.EITHER_EDGE, 4000, 7)
t8 = gpioTest(pi,15, pigpio.RISING_EDGE, 8000, 8)
t9 = gpioTest(pi,17, pigpio.FALLING_EDGE, 8000, 9)
t10 = gpioTest(pi,18, pigpio.EITHER_EDGE, 4000, 10)
t11 = gpioTest(pi,22, pigpio.RISING_EDGE, 8000, 11)
t12 = gpioTest(pi,23, pigpio.FALLING_EDGE, 8000, 12)
t13 = gpioTest(pi,24, pigpio.EITHER_EDGE, 4000, 13)
t14 = gpioTest(pi,25, pigpio.RISING_EDGE, 8000, 14)
# R1: 0 1 4 7 8 9 10 11 14 15 17 18 21 22 23 24 25
# R2: 2 3 4 7 8 9 10 11 14 15 17 18 22 23 24 25 27 28 29 30 31
tests = [t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14]
for i in tests: i.start()
time.sleep(delay)
for i in tests: i.stop()
pi.stop()
tot = 0
msg = ""
for i in tests:
tot += i.num()
msg += str(i.num()) + " "
print(msg)
print("eps={} ({}/{})".format(tot/delay, tot, delay))

View File

@ -0,0 +1,2 @@
Class to decode a mechanical rotary encoder.

View File

@ -0,0 +1,135 @@
#!/usr/bin/env python
import pigpio
class decoder:
"""Class to decode mechanical rotary encoder pulses."""
def __init__(self, pi, gpioA, gpioB, callback):
"""
Instantiate the class with the pi and gpios connected to
rotary encoder contacts A and B. The common contact
should be connected to ground. The callback is
called when the rotary encoder is turned. It takes
one parameter which is +1 for clockwise and -1 for
counterclockwise.
EXAMPLE
import time
import pigpio
import rotary_encoder
pos = 0
def callback(way):
global pos
pos += way
print("pos={}".format(pos))
pi = pigpio.pi()
decoder = rotary_encoder.decoder(pi, 7, 8, callback)
time.sleep(300)
decoder.cancel()
pi.stop()
"""
self.pi = pi
self.gpioA = gpioA
self.gpioB = gpioB
self.callback = callback
self.levA = 0
self.levB = 0
self.lastGpio = None
self.pi.set_mode(gpioA, pigpio.INPUT)
self.pi.set_mode(gpioB, pigpio.INPUT)
self.pi.set_pull_up_down(gpioA, pigpio.PUD_UP)
self.pi.set_pull_up_down(gpioB, pigpio.PUD_UP)
self.cbA = self.pi.callback(gpioA, pigpio.EITHER_EDGE, self._pulse)
self.cbB = self.pi.callback(gpioB, pigpio.EITHER_EDGE, self._pulse)
def _pulse(self, gpio, level, tick):
"""
Decode the rotary encoder pulse.
+---------+ +---------+ 0
| | | |
A | | | |
| | | |
+---------+ +---------+ +----- 1
+---------+ +---------+ 0
| | | |
B | | | |
| | | |
----+ +---------+ +---------+ 1
"""
if gpio == self.gpioA:
self.levA = level
else:
self.levB = level;
if gpio != self.lastGpio: # debounce
self.lastGpio = gpio
if gpio == self.gpioA and level == 1:
if self.levB == 1:
self.callback(1)
elif gpio == self.gpioB and level == 1:
if self.levA == 1:
self.callback(-1)
def cancel(self):
"""
Cancel the rotary encoder decoder.
"""
self.cbA.cancel()
self.cbB.cancel()
if __name__ == "__main__":
import time
import pigpio
import rotary_encoder
pos = 0
def callback(way):
global pos
pos += way
print("pos={}".format(pos))
pi = pigpio.pi()
decoder = rotary_encoder.decoder(pi, 7, 8, callback)
time.sleep(300)
decoder.cancel()
pi.stop()

View File

@ -1 +0,0 @@
pigpio Python examples

View File

@ -0,0 +1,2 @@
Class to read sonar rangers with separate trigger and echo pins.

View File

@ -0,0 +1,114 @@
#!/usr/bin/env python
import time
import pigpio
class ranger:
"""
This class encapsulates a type of acoustic ranger. In particular
the type of ranger with separate trigger and echo pins.
A pulse on the trigger initiates the sonar ping and shortly
afterwards a sonar pulse is transmitted and the echo pin
goes high. The echo pins stays high until a sonar echo is
received (or the response times-out). The time between
the high and low edges indicates the sonar round trip time.
"""
def __init__(self, pi, trigger, echo):
"""
The class is instantiated with the Pi to use and the
gpios connected to the trigger and echo pins.
"""
self.pi = pi
self._trig = trigger
self._echo = echo
self._ping = False
self._high = None
self._time = None
self._triggered = False
self._trig_mode = pi.get_mode(self._trig)
self._echo_mode = pi.get_mode(self._echo)
pi.set_mode(self._trig, pigpio.OUTPUT)
pi.set_mode(self._echo, pigpio.INPUT)
self._cb = pi.callback(self._trig, pigpio.EITHER_EDGE, self._cbf)
self._cb = pi.callback(self._echo, pigpio.EITHER_EDGE, self._cbf)
self._inited = True
def _cbf(self, gpio, level, tick):
if gpio == self._trig:
if level == 0: # trigger sent
self._triggered = True
self._high = None
else:
if self._triggered:
if level == 1:
self._high = tick
else:
if self._high is not None:
self._time = tick - self._high
self._high = None
self._ping = True
def read(self):
"""
Triggers a reading. The returned reading is the number
of microseconds for the sonar round-trip.
round trip cms = round trip time / 1000000.0 * 34030
"""
if self._inited:
self._ping = False
self.pi.gpio_trigger(self._trig)
start = time.time()
while not self._ping:
if (time.time()-start) > 5.0:
return 20000
time.sleep(0.001)
return self._time
else:
return None
def cancel(self):
"""
Cancels the ranger and returns the gpios to their
original mode.
"""
if self._inited:
self._inited = False
self._cb.cancel()
self.pi.set_mode(self._trig, self._trig_mode)
self.pi.set_mode(self._echo, self._echo_mode)
if __name__ == "__main__":
import time
import pigpio
import sonar_trigger_echo
pi = pigpio.pi()
sonar = sonar_trigger_echo.ranger(pi, 23, 18)
end = time.time() + 600.0
r = 1
while time.time() < end:
print("{} {}".format(r, sonar.read()))
r += 1
time.sleep(0.03)
sonar.cancel()
pi.stop()

View File

@ -0,0 +1,2 @@
Class to send and receive radio messages compatible with the Virtual Wire library for Arduinos.

View File

@ -0,0 +1,372 @@
#!/usr/bin/env python
"""
This module provides a 313MHz/434MHz radio interface compatible
with the Virtual Wire library used on Arduinos.
It has been tested between a Pi, TI Launchpad, and Arduino Pro Mini.
"""
# 2014-08-14
# vw.py
import time
import pigpio
MAX_MESSAGE_BYTES=77
MIN_BPS=50
MAX_BPS=10000
_HEADER=[0x2a, 0x2a, 0x2a, 0x2a, 0x2a, 0x2a, 0x38, 0x2c]
_CTL=3
_SYMBOL=[
0x0d, 0x0e, 0x13, 0x15, 0x16, 0x19, 0x1a, 0x1c,
0x23, 0x25, 0x26, 0x29, 0x2a, 0x2c, 0x32, 0x34]
def _sym2nibble(symbol):
for nibble in range(16):
if symbol == _SYMBOL[nibble]:
return nibble
return 0
def _crc_ccitt_update(crc, data):
data = data ^ (crc & 0xFF);
data = (data ^ (data << 4)) & 0xFF;
return (
(((data << 8) & 0xFFFF) | (crc >> 8)) ^
((data >> 4) & 0x00FF) ^ ((data << 3) & 0xFFFF)
)
class tx():
def __init__(self, pi, txgpio, bps=2000):
"""
Instantiate a transmitter with the Pi, the transmit gpio,
and the bits per second (bps). The bps defaults to 2000.
The bps is constrained to be within MIN_BPS to MAX_BPS.
"""
self.pi = pi
self.txbit = (1<<txgpio)
if bps < MIN_BPS:
bps = MIN_BPS
elif bps > MAX_BPS:
bps = MAX_BPS
self.mics = int(1000000 / bps)
self.wave_id = None
pi.wave_add_new()
pi.set_mode(txgpio, pigpio.OUTPUT)
def _nibble(self, nibble):
for i in range(6):
if nibble & (1<<i):
self.wf.append(pigpio.pulse(self.txbit, 0, self.mics))
else:
self.wf.append(pigpio.pulse(0, self.txbit, self.mics))
def _byte(self, crc, byte):
self._nibble(_SYMBOL[byte>>4])
self._nibble(_SYMBOL[byte&0x0F])
return _crc_ccitt_update(crc, byte)
def put(self, data):
"""
Transmit a message. If the message is more than
MAX_MESSAGE_BYTES in size it is discarded. If a message
is currently being transmitted it is aborted and replaced
with the new message. True is returned if message
transmission has successfully started. False indicates
an error.
"""
if len(data) > MAX_MESSAGE_BYTES:
return False
self.wf = []
self.cancel()
for i in _HEADER:
self._nibble(i)
crc = self._byte(0xFFFF, len(data)+_CTL)
for i in data:
if type(i) == type(""):
v = ord(i)
else:
v = i
crc = self._byte(crc, v)
crc = ~crc
self._byte(0, crc&0xFF)
self._byte(0, crc>>8)
self.pi.wave_add_generic(self.wf)
self.wave_id = self.pi.wave_create()
if self.wave_id >= 0:
self.pi.wave_send_once(self.wave_id)
return True
else:
return False
def ready(self):
"""
Returns True if a new message may be transmitted.
"""
return not self.pi.wave_tx_busy()
def cancel(self):
"""
Cancels the wireless transmitter, aborting any message
in progress.
"""
if self.wave_id is not None:
self.pi.wave_tx_stop()
self.pi.wave_delete(self.wave_id)
self.pi.wave_add_new()
self.wave_id = None
class rx():
def __init__(self, pi, rxgpio, bps=2000):
"""
Instantiate a receiver with the Pi, the receive gpio, and
the bits per second (bps). The bps defaults to 2000.
The bps is constrained to be within MIN_BPS to MAX_BPS.
"""
self.pi = pi
self.rxgpio = rxgpio
self.messages = []
self.bad_CRC = 0
if bps < MIN_BPS:
bps = MIN_BPS
elif bps > MAX_BPS:
bps = MAX_BPS
slack = 0.20
self.mics = int(1000000 / bps)
slack_mics = int(slack * self.mics)
self.min_mics = self.mics - slack_mics # Shortest legal edge.
self.max_mics = (self.mics + slack_mics) * 4 # Longest legal edge.
self.timeout = 8 * self.mics / 1000 # 8 bits time in ms.
if self.timeout < 8:
self.timeout = 8
self.last_tick = None
self.good = 0
self.bits = 0
self.token = 0
self.in_message = False
self.message = [0]*(MAX_MESSAGE_BYTES+_CTL)
self.message_len = 0
self.byte = 0
pi.set_mode(rxgpio, pigpio.INPUT)
self.cb = pi.callback(rxgpio, pigpio.EITHER_EDGE, self._cb)
def _calc_crc(self):
crc = 0xFFFF
for i in range(self.message_length):
crc = _crc_ccitt_update(crc, self.message[i])
return crc
def _insert(self, bits, level):
for i in range(bits):
self.token >>= 1
if level == 0:
self.token |= 0x800
if self.in_message:
self.bits += 1
if self.bits >= 12: # Complete token.
byte = (
_sym2nibble(self.token & 0x3f) << 4 |
_sym2nibble(self.token >> 6))
if self.byte == 0:
self.message_length = byte
if byte > (MAX_MESSAGE_BYTES+_CTL):
self.in_message = False # Abort message.
return
self.message[self.byte] = byte
self.byte += 1
self.bits = 0
if self.byte >= self.message_length:
self.in_message = False
self.pi.set_watchdog(self.rxgpio, 0)
crc = self._calc_crc()
if crc == 0xF0B8: # Valid CRC.
self.messages.append(
self.message[1:self.message_length-2])
else:
self.bad_CRC += 1
else:
if self.token == 0xB38: # Start message token.
self.in_message = True
self.pi.set_watchdog(self.rxgpio, self.timeout)
self.bits = 0
self.byte = 0
def _cb(self, gpio, level, tick):
if self.last_tick is not None:
if level == pigpio.TIMEOUT:
self.pi.set_watchdog(self.rxgpio, 0) # Switch watchdog off.
if self.in_message:
self._insert(4, not self.last_level)
self.good = 0
self.in_message = False
else:
edge = pigpio.tickDiff(self.last_tick, tick)
if edge < self.min_mics:
self.good = 0
self.in_message = False
elif edge > self.max_mics:
if self.in_message:
self._insert(4, level)
self.good = 0
self.in_message = False
else:
self.good += 1
if self.good > 8:
bitlen = (100 * edge) / self.mics
if bitlen < 140:
bits = 1
elif bitlen < 240:
bits = 2
elif bitlen < 340:
bits = 3
else:
bits = 4
self._insert(bits, level)
self.last_tick = tick
self.last_level = level
def get(self):
"""
Returns the next unread message, or None if none is avaiable.
"""
if len(self.messages):
return self.messages.pop(0)
else:
return None
def ready(self):
"""
Returns True if there is a message available to be read.
"""
return len(self.messages)
def cancel(self):
"""
Cancels the wireless receiver.
"""
if self.cb is not None:
self.cb.cancel()
self.pi.set_watchdog(self.rxgpio, 0)
self.cb = None
if __name__ == "__main__":
import time
import pigpio
import vw
RX=11
TX=25
BPS=2000
pi = pigpio.pi() # Connect to local Pi.
rx = vw.rx(pi, RX, BPS) # Specify Pi, rx gpio, and baud.
tx = vw.tx(pi, TX, BPS) # Specify Pi, tx gpio, and baud.
msg = 0
start = time.time()
while (time.time()-start) < 300:
msg += 1
while not tx.ready():
time.sleep(0.1)
time.sleep(0.2)
tx.put([48, 49, 65, ((msg>>6)&0x3F)+32, (msg&0x3F)+32])
while not tx.ready():
time.sleep(0.1)
time.sleep(0.2)
tx.put("Hello World #{}!".format(msg))
while rx.ready():
print("".join(chr (c) for c in rx.get()))
rx.cancel()
tx.cancel()
pi.stop()

View File

@ -0,0 +1,2 @@
Class to decode a Wiegand code.

View File

@ -0,0 +1,135 @@
#!/usr/bin/env python
import pigpio
class decoder:
"""
A class to read Wiegand codes of an arbitrary length.
The code length and value are returned.
EXAMPLE
#!/usr/bin/env python
import time
import pigpio
import wiegand
def callback(bits, code):
print("bits={} code={}".format(bits, code))
pi = pigpio.pi()
w = wiegand.decoder(pi, 14, 15, callback)
time.sleep(300)
w.cancel()
pi.stop()
"""
def __init__(self, pi, gpio_0, gpio_1, callback, bit_timeout=5):
"""
Instantiate with the pi, gpio for 0 (green wire), the gpio for 1
(white wire), the callback function, and the bit timeout in
milliseconds which indicates the end of a code.
The callback is passed the code length in bits and the value.
"""
self.pi = pi
self.gpio_0 = gpio_0
self.gpio_1 = gpio_1
self.callback = callback
self.bit_timeout = bit_timeout
self.in_code = False
self.pi.set_mode(gpio_0, pigpio.INPUT)
self.pi.set_mode(gpio_1, pigpio.INPUT)
self.pi.set_pull_up_down(gpio_0, pigpio.PUD_UP)
self.pi.set_pull_up_down(gpio_1, pigpio.PUD_UP)
self.cb_0 = self.pi.callback(gpio_0, pigpio.FALLING_EDGE, self._cb)
self.cb_1 = self.pi.callback(gpio_1, pigpio.FALLING_EDGE, self._cb)
def _cb(self, gpio, level, tick):
"""
Accumulate bits until both gpios 0 and 1 timeout.
"""
if level < pigpio.TIMEOUT:
if self.in_code == False:
self.bits = 1
self.num = 0
self.in_code = True
self.code_timeout = 0
self.pi.set_watchdog(self.gpio_0, self.bit_timeout)
self.pi.set_watchdog(self.gpio_1, self.bit_timeout)
else:
self.bits += 1
self.num = self.num << 1
if gpio == self.gpio_0:
self.code_timeout = self.code_timeout & 2 # clear gpio 0 timeout
else:
self.code_timeout = self.code_timeout & 1 # clear gpio 1 timeout
self.num = self.num | 1
else:
if self.in_code:
if gpio == self.gpio_0:
self.code_timeout = self.code_timeout | 1 # timeout gpio 0
else:
self.code_timeout = self.code_timeout | 2 # timeout gpio 1
if self.code_timeout == 3: # both gpios timed out
self.pi.set_watchdog(self.gpio_0, 0)
self.pi.set_watchdog(self.gpio_1, 0)
self.in_code = False
self.callback(self.bits, self.num)
def cancel(self):
"""
Cancel the Wiegand decoder.
"""
self.cb_0.cancel()
self.cb_1.cancel()
if __name__ == "__main__":
import time
import pigpio
import wiegand
def callback(bits, value):
print("bits={} value={}".format(bits, value))
pi = pigpio.pi()
w = wiegand.decoder(pi, 14, 15, callback)
time.sleep(300)
w.cancel()
pi.stop()