added SMBus Lux meter support
This commit is contained in:
@@ -4,7 +4,7 @@
|
||||
|
||||
import serial as ser
|
||||
import pandas as pd
|
||||
from threading import Timer
|
||||
import smbus2
|
||||
from sys import exit as ex
|
||||
import time
|
||||
|
||||
@@ -22,7 +22,7 @@ class Spectrometer:
|
||||
if rstring == 'ERROR':
|
||||
raise Exception #sensor is in error state
|
||||
except:
|
||||
print('An exception ocurred when performing spectrometer handshake')
|
||||
raise Exception('An exception ocurred when performing spectrometer handshake')
|
||||
ex(1)
|
||||
self.setParameters()
|
||||
|
||||
@@ -52,39 +52,111 @@ class Spectrometer:
|
||||
self.serialObject.write('ATLED3={}\n'.format(led).encode())
|
||||
self.serialObject.readline()
|
||||
except:
|
||||
print('An exception occured during spectrometer initialization')
|
||||
raise Exception('An exception occured during spectrometer initialization')
|
||||
ex(1)
|
||||
|
||||
def startDataCollection(self):
|
||||
def refreshData(self):
|
||||
try:
|
||||
self.serialObject.write(b'ATCDATA\n')
|
||||
rawresp = self.serialObject.readline().decode()
|
||||
except:
|
||||
print('An exception occurred when polling for spectrometer data')
|
||||
raise Exception('An exception occurred when polling for spectrometer data')
|
||||
ex(1)
|
||||
else:
|
||||
responseorder = [i for i in 'RSTUVWGHIJKLABCDEF']
|
||||
realorder = [i for i in 'ABCDEFGHRISJTUVWKL']
|
||||
response = pd.Series([float(i)/35.0 for i in rawresp[:-3].split(',')], index=responseorder)
|
||||
self.data = pd.DataFrame(response, index=realorder, columns = ['uW/cm^2'])
|
||||
self.schedule(self.rrate)
|
||||
|
||||
def schedule(self, refresh):
|
||||
self.timerObject = Timer(refresh, self.startDataCollection)
|
||||
self.timerObject.start()
|
||||
|
||||
|
||||
def __init__(self, path='/dev/ttyUSB0', baudrate=115200, tout=1, rrate=1, params={}):
|
||||
self.path = path
|
||||
self.baudrate = baudrate
|
||||
self.timeout = 1
|
||||
self.rrate = rrate
|
||||
self.parameters = params
|
||||
try:
|
||||
self.serialObject = ser.Serial(path, baudrate, timeout=tout)
|
||||
except:
|
||||
print('An exception occured when opening the serial port at {}'.format(path))
|
||||
raise Exception('An exception occured when opening the serial port at {}'.format(path))
|
||||
ex(1)
|
||||
else:
|
||||
self.initializeSensor()
|
||||
self.startDataCollection()
|
||||
|
||||
|
||||
class LxMeter:
|
||||
def __init__(self, busNumber=1, addr=0x39):
|
||||
self.addr = address
|
||||
try:
|
||||
self.bus = smbus2.SMBus(busNumber) # initialize the SMBus interface
|
||||
except:
|
||||
raise Exception('An exception occured opening the SMBus {}'.format(self.bus))
|
||||
|
||||
try:
|
||||
self.bus.write_byte_data(addr, 0xa0, 0x03) # enable the sensor
|
||||
except:
|
||||
raise Exception('An exception occured when enabling lux meter')
|
||||
|
||||
def setGain(self, gain):
|
||||
'''Set the sensor gain. Either 1 or 16.'''
|
||||
if gain == 1:
|
||||
try:
|
||||
temp = self.bus.read_byte_data(addr, 0xa1)
|
||||
self.bus.write_byte_data(addr, 0xa1, 0xef & temp)
|
||||
except:
|
||||
raise Exception('An exception occured when setting lux meter gain')
|
||||
if gain == 16:
|
||||
try:
|
||||
temp = self.bus.read_byte_data(addr, 0xa1)
|
||||
self.bus.write_byte_data(addr, 0xa1, 0x10 | temp)
|
||||
except:
|
||||
raise Exception('An exception occured when setting lux meter gain')
|
||||
else:
|
||||
raise Exception('Invalid gain')
|
||||
|
||||
def getGain(self):
|
||||
'''Get the gain from the sensor.'''
|
||||
try:
|
||||
if self.bus.read_byte_data(addr, 0xa1) & 0x10 == 0x10:
|
||||
return 16
|
||||
if self.bus.read_byte_data(addr, 0xa1) & 0x10 == 0x00:
|
||||
return 1
|
||||
except:
|
||||
raise Exception('An error occured when getting lux meter gain')
|
||||
|
||||
def setIntTime(self, time):
|
||||
'''Set the lux sensor integration time. 0 to including 2'''
|
||||
if time < 0 or time > 2:
|
||||
raise Exception('Invalid integration time')
|
||||
try:
|
||||
temp = self.bus.read_byte_data(addr, 0xa1)
|
||||
self.bus.write_byte_data(addr, 0xa1, (temp & 0xfc) | time)
|
||||
except:
|
||||
raise Exception('An error occured setting lux integration time')
|
||||
|
||||
def getIntTime(self):
|
||||
'''Get the lux sensor integration time.'''
|
||||
try:
|
||||
return self.bus.read_byte_data(addr, 0xa1) & 0xfc
|
||||
except:
|
||||
raise Exception('An error occured getting lux integration time')
|
||||
|
||||
def getData(self):
|
||||
'''return the calculated lux value'''
|
||||
try:
|
||||
chA = self.bus.read_word_data(addr, 0xac)
|
||||
chB = self.bus.read_word_data(addr, 0xae)
|
||||
except:
|
||||
raise Exception('An error occured fetching lux channels')
|
||||
|
||||
if chB/chA <= 0.5 and chB/chA > 0:
|
||||
lux = (0.0304*chA) - (0.062*chA*((chB/chA)**1.4))
|
||||
else if chB/chA <= 0.61 and chB/chA > 0.5:
|
||||
lux = (0.0224*chA) - (0.031*chB)
|
||||
else if chB/chA <=0.8 and chB/chA > 0.61:
|
||||
lux = (0.0128*chA) - (0.0153*chB)
|
||||
else if chB/chA <=1.3 and chB/chA >0.8:
|
||||
lux = (0.00146*chA) - (0.00112*chB)
|
||||
else:
|
||||
lux = 0
|
||||
return lux
|
||||
|
||||
Reference in New Issue
Block a user