Static code analysis and corrections

This commit is contained in:
Kristjan Komlosi
2019-07-17 16:06:09 +02:00
parent 674692c2fc
commit 21bfae9fbc
10086 changed files with 2102103 additions and 51 deletions

View File

@@ -1,37 +1,40 @@
# sensors.py - a module for interfacing to the sensors
'''Module for interfacing with TeraHz sensors'''
# Copyright 2019 Kristjan Komloši
# All code in this file is licensed under the ISC license, provided in LICENSE.txt
import serial as ser
import pandas as pd
import smbus2
from sys import exit as ex
import time
class Spectrometer:
'''Class representing the AS7265X specrometer'''
def initializeSensor(self):
'''confirm the sensor is responding and proceed with spectrometer initialization'''
'''confirm the sensor is responding and proceed\
with spectrometer initialization'''
try:
rstring='undefined' # just need it set to a value
rstring = 'undefined' # just need it set to a value
self.setParameters({'gain': 0})
self.serialObject.write(b'AT\n')
rstring=self.serialObject.readline().decode()
rstring = self.serialObject.readline().decode()
if rstring == 'undefined':
raise Exception #sensor didn't respond
raise Exception # sensor didn't respond
if rstring == 'OK':
pass #handshake passed
pass # handshake passed
if rstring == 'ERROR':
raise Exception #sensor is in error state
raise Exception # sensor is in error state
except:
raise Exception('An exception ocurred when performing spectrometer handshake')
ex(1)
raise Exception(
'An exception ocurred when performing spectrometer handshake')
def setParameters(self, parameters):
'''applies the parameters like LED light and gain to the spectrometer'''
try:
if 'it_time' in parameters:
it_time = int(parameters['it_time'])
if it_time <=0 :
if it_time <= 0:
it_time = 1
self.serialObject.write('ATINTTIME={}\n'.format(string(it_time)).encode())
self.serialObject.write(
'ATINTTIME={}\n'.format(str(it_time)).encode())
self.serialObject.readline()
if 'gain' in parameters:
@@ -44,28 +47,29 @@ class Spectrometer:
if 'led' in parameters:
led = bool(parameters['led'])
if led:
led=1
led = 1
else:
led=0
led = 0
self.serialObject.write('ATLED3={}\n'.format(led).encode())
self.serialObject.readline()
except:
raise Exception('An exception occured during spectrometer initialization')
ex(1)
raise Exception(
'An exception occured during spectrometer initialization')
def getData(self):
'''Returns spectral data in a pandas DataFrame.'''
try:
self.serialObject.write(b'ATCDATA\n')
rawresp = self.serialObject.readline().decode()
except:
raise Exception('An exception occurred when polling for spectrometer data')
ex(1)
raise Exception(
'An exception occurred when polling for spectrometer data')
else:
responseorder = [i for i in 'RSTUVWGHIJKLABCDEF']
realorder = [i for i in 'ABCDEFGHRISJTUVWKL']
response = pd.Series([float(i)/35.0 for i in rawresp[:-3].split(',')], index=responseorder)
return pd.DataFrame(response, index=realorder, columns = ['uW/cm^2']).to_dict()['uW/cm^2']
response = pd.Series(
[float(i) / 35.0 for i in rawresp[:-3].split(',')], index=responseorder)
return pd.DataFrame(response, index=realorder, columns=['uW/cm^2']).to_dict()['uW/cm^2']
def __init__(self, path='/dev/ttyUSB0', baudrate=115200, tout=1):
self.path = path
@@ -74,22 +78,26 @@ class Spectrometer:
try:
self.serialObject = ser.Serial(path, baudrate, timeout=tout)
except:
raise Exception('An exception occured when opening the serial port at {}'.format(path))
ex(1)
raise Exception(
'An exception occured when opening the serial port at {}'.format(path))
else:
self.initializeSensor()
class LxMeter:
'''Class representing the APDS-9301 digital photometer.'''
def __init__(self, busNumber=1, addr=0x39):
self.addr = addr
try:
self.bus = smbus2.SMBus(busNumber) # initialize the SMBus interface
# initialize the SMBus interface
self.bus = smbus2.SMBus(busNumber)
except:
raise Exception('An exception occured opening the SMBus {}'.format(self.bus))
raise Exception(
'An exception occured opening the SMBus {}'.format(self.bus))
try:
self.bus.write_byte_data(self.addr, 0xa0, 0x03) # enable the sensor
self.bus.write_byte_data(
self.addr, 0xa0, 0x03) # enable the sensor
self.setGain(16)
except:
raise Exception('An exception occured when enabling lux meter')
@@ -101,13 +109,15 @@ class LxMeter:
temp = self.bus.read_byte_data(self.addr, 0xa1)
self.bus.write_byte_data(self.addr, 0xa1, 0xef & temp)
except:
raise Exception('An exception occured when setting lux meter gain')
raise Exception(
'An exception occured when setting lux meter gain')
if gain == 16:
try:
temp = self.bus.read_byte_data(self.addr, 0xa1)
self.bus.write_byte_data(self.addr, 0xa1, 0x10 | temp)
except:
raise Exception('An exception occured when setting lux meter gain')
raise Exception(
'An exception occured when setting lux meter gain')
else:
raise Exception('Invalid gain')
@@ -118,6 +128,8 @@ class LxMeter:
return 16
if self.bus.read_byte_data(self.addr, 0xa1) & 0x10 == 0x00:
return 1
raise Exception('An error occured when getting lux meter gain')
# Under normal conditions, this raise is unreachable.
except:
raise Exception('An error occured when getting lux meter gain')
@@ -129,14 +141,16 @@ class LxMeter:
temp = self.bus.read_byte_data(self.addr, 0xa1)
self.bus.write_byte_data(self.addr, 0xa1, (temp & 0xfc) | time)
except:
raise Exception('An exception occured setting lux integration time')
raise Exception(
'An exception occured setting lux integration time')
def getIntTime(self):
'''Get the lux sensor integration time.'''
try:
return self.bus.read_byte_data(self.addr, 0xa1) & 0x03
except:
raise Exception('An exception occured getting lux integration time')
raise Exception(
'An exception occured getting lux integration time')
def getData(self):
'''return the calculated lux value'''
@@ -147,31 +161,35 @@ class LxMeter:
raise Exception('An exception occured fetching lux channels')
# scary computations ahead! refer to the apds-9301 datasheet!
if chB/chA <= 0.5 and chB/chA > 0:
lux = 0.0304*chA - 0.062*chA*(chB/chA)**1.4
elif chB/chA <= 0.61 and chB/chA > 0.5:
lux = 0.0224*chA - 0.031*chB
elif chB/chA <=0.8 and chB/chA > 0.61:
lux = 0.0128*chA - 0.0153*chB
elif chB/chA <=1.3 and chB/chA >0.8:
lux = 0.00146*chA - 0.00112*chB
if chB / chA <= 0.5 and chB / chA > 0:
lux = 0.0304 * chA - 0.062 * chA * (chB / chA)**1.4
elif chB / chA <= 0.61 and chB / chA > 0.5:
lux = 0.0224 * chA - 0.031 * chB
elif chB / chA <= 0.8 and chB / chA > 0.61:
lux = 0.0128 * chA - 0.0153 * chB
elif chB / chA <= 1.3 and chB / chA > 0.8:
lux = 0.00146 * chA - 0.00112 * chB
else:
lux = 0
return lux
class UVSensor:
'''Class representing VEML6075 UVA/B meter'''
def __init__(self, bus=1, addr=0x10):
self.addr=addr
self.addr = addr
try:
self.bus = smbus2.SMBus(bus)
except:
raise Exception('An exception occured opening SMBus {}'.format(bus))
raise Exception(
'An exception occured opening SMBus {}'.format(bus))
try:
# enable the sensor and set the integration time
self.bus.write_byte_data(self.addr, 0x00, 0b00010000)
except:
raise Exception('An exception occured when initalizing the UV Sensor')
raise Exception(
'An exception occured when initalizing the UV Sensor')
def getABI(self):
'''Calculates the UVA and UVB irradiances,
@@ -199,15 +217,18 @@ class UVSensor:
# last, calculate the UV index
i = (a + b) / 2
return [a,b,i]
return [a, b, i]
def getA(self):
'''Returns UVA value. A getABI() wrapper.'''
return self.getABI()[0]
def getB(self):
'''Returns UVB value. A getABI() wrapper.'''
return self.getABI()[1]
def getI(self):
'''Returns UV index. A getABI() wrapper.'''
return self.getABI()[2]
def on(self):
@@ -217,7 +238,8 @@ class UVSensor:
# no configurable params = no bitmask
self.bus.write_byte_data(self.addr, 0x00, 0x10)
except:
raise Exception('An exception occured when turning the UV sensor on')
raise Exception(
'An exception occured when turning the UV sensor on')
def off(self):
'''Shuts the UV sensor down.'''
@@ -226,4 +248,5 @@ class UVSensor:
# no configurable params = no bitmask
self.bus.write_byte_data(self.addr, 0x00, 0x11)
except:
raise Exception('An exception occured when shutting the UV sensor down')
raise Exception(
'An exception occured when shutting the UV sensor down')