1+ import time
2+
3+ PRINT_FLOOR = 5
4+ CONSOLE = True
5+ DEBUG = True
6+
7+
18class PuffDetector :
29 def __init__ (self , min_pressure = 8 , high_pressure = 20 ):
310 self .high_pressure = high_pressure
411 self .min_pressure = min_pressure
512
13+ self .start_polarity = 0
14+ self .peak_level = 0
15+ self .counter = 0
16+ self .duration = 0
17+ self .puff_start = 0
18+
619 @classmethod
720 def rolling_average (cls , measurements , window_size = 3 ):
821 # print("measurements", measurements)
922 window = measurements [- window_size :]
1023
1124 return sum (window ) / window_size
1225
13- @classmethod
14- def slope (cls , a , b ):
15-
16- if a > b :
17- return 1
18- elif a < b :
19- return - 1
20- else :
21- return 0
22-
23- @classmethod
24- def direction (cls , measurements ): # requires 6 measurements
25- average = cls .rolling_average (measurements )
26- prev_average = cls .rolling_average (measurements [- 6 :- 3 ])
27- # print()
28- # print("measurements:", measurements)
29- # print("prev_average", prev_average)
30- # print("average:", average)
31- current_slope = cls .slope (average , prev_average )
32- # print("slope:", current_slope)
33- return current_slope
34-
35- @classmethod
36- def direction_changed (cls , measurements , prev_direction ):
37- direction = cls .direction (measurements )
38- return prev_direction != direction
39-
40- def catagorize_pressure (self , pressure , prev_pressure ):
26+ def catagorize_pressure (self , pressure ):
4127 """determine the strength and polarity of the pressure reading"""
4228 level = 0
43- direction = 0
4429 polarity = 0
4530 abs_pressure = abs (pressure )
4631
@@ -55,16 +40,11 @@ def catagorize_pressure(self, pressure, prev_pressure):
5540 else :
5641 polarity = - 1
5742
58- if pressure > prev_pressure :
59- direction = 1
60- if pressure < prev_pressure :
61- direction = - 1
62-
63- return (polarity , level , direction )
43+ return (polarity , level )
6444
6545 @staticmethod
6646 def pressure_string (pressure_type ):
67- polarity , level , direction = pressure_type # pylint:disable=unused-variable
47+ polarity , level = pressure_type # pylint:disable=unused-variable
6848 pressure_str = "HIGH"
6949 if level == 0 or polarity == 0 :
7050 return ""
@@ -80,9 +60,51 @@ def pressure_string(pressure_type):
8060 pressure_str += "SIP"
8161 return pressure_str
8262
63+ def check_for_puff (self , current_pressure ):
64+ puff_polarity = None
65+ puff_peak_level = None
66+ puff_duration = None
67+ #######################
68+ polarity , level = self .catagorize_pressure (current_pressure )
69+
70+ # if (polarity != 0) or (level != 0):
71+ if abs (current_pressure ) > PRINT_FLOOR :
72+ if self .counter % 4 == 0 :
73+ if DEBUG and CONSOLE :
74+ print ("\t \t \t pressure:" , current_pressure )
75+
76+ if level != 0 and self .start_polarity == 0 : ###
77+ self .start_polarity = polarity
78+ self .puff_start = time .monotonic ()
79+ puff_polarity = self .start_polarity
80+
81+ if self .start_polarity != 0 :
82+ if level > self .peak_level :
83+ self .peak_level = level
84+
85+ if (level == 0 ) and (self .start_polarity != 0 ):
86+ self .duration = time .monotonic () - self .puff_start
87+
88+ puff_polarity = self .start_polarity
89+ puff_peak_level = self .peak_level
90+ puff_duration = self .duration
91+
92+ self .start_polarity = 0
93+ self .peak_level = 0
94+ self .duration = 0
95+ self .counter += 1
96+ return (puff_polarity , puff_peak_level , puff_duration )
97+ ##############################################
98+
8399
84100# pylint:disable=pointless-string-statement
85101"""
102+ pressure_list = []
103+ prev_pressure_type = tuple()
104+ prev_polarity = None
105+ current_level = 0
106+ prev_level = 0
107+
86108 def old_detect(self):
87109 if pressure_type != prev_pressure_type:
88110 puff_end = time.monotonic()
@@ -114,4 +136,6 @@ def old_detect(self):
114136 if CONSOLE: print("____________________")
115137 prev_pressure_type = pressure_type
116138 prev_duration = puff_duration
139+ prev_level = level
140+ prev_level = level
117141"""
0 commit comments