9
9
10
10
import sqlite3
11
11
import json
12
+ import hashlib
12
13
from datetime import datetime , timedelta
13
14
14
15
TIME_FORMAT_DB = "%Y-%m-%dT%H:%M:%S.%f"
@@ -22,11 +23,19 @@ def __init__(self, base, db_days):
22
23
23
24
self .db = sqlite3 .connect (self .base .config_root + "/predbat.db" )
24
25
self .db_cursor = self .db .cursor ()
25
- self .entity_id_cache = {}
26
26
27
27
self ._cleanup_db ()
28
28
self .log ("db_engine: Started" )
29
29
30
+ def _entity_hash (self , entity_id ):
31
+ """
32
+ Generate a consistent hash for an entity_id
33
+ """
34
+ # Use SHA256 hash and take first 8 bytes as integer for 64-bit signed integer
35
+ # This gives us a deterministic ID that's the same across all instances
36
+ hash_bytes = hashlib .sha256 (entity_id .encode ('utf-8' )).digest ()[:8 ]
37
+ return int .from_bytes (hash_bytes , byteorder = 'big' , signed = True )
38
+
30
39
def _close (self ):
31
40
"""
32
41
Close the database connection
@@ -40,11 +49,39 @@ def _cleanup_db(self):
40
49
"""
41
50
This searches all tables for data older than N days and deletes it
42
51
"""
43
- self .db_cursor .execute ("CREATE TABLE IF NOT EXISTS entities (entity_index INTEGER PRIMARY KEY AUTOINCREMENT, entity_name TEXT KEY UNIQUE)" )
44
- self .db_cursor .execute ("CREATE TABLE IF NOT EXISTS states (id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT KEY, entity_index INTEGER KEY, state TEXT, attributes TEXT, system TEXT, keep TEXT KEY)" )
45
- self .db_cursor .execute ("CREATE TABLE IF NOT EXISTS latest (entity_index INTEGER PRIMARY KEY, datetime TEXT KEY, state TEXT, attributes TEXT, system TEXT, keep TEXT KEY)" )
52
+ # Modified schema: latest table uses hash as primary key, stores entity_id
53
+ # history table references latest via entity_hash foreign key
54
+ self .db_cursor .execute ("""
55
+ CREATE TABLE IF NOT EXISTS latest (
56
+ entity_hash INTEGER PRIMARY KEY,
57
+ entity_id TEXT UNIQUE NOT NULL,
58
+ datetime TEXT,
59
+ state TEXT,
60
+ attributes TEXT,
61
+ system TEXT,
62
+ keep TEXT
63
+ )
64
+ """ )
65
+
66
+ self .db_cursor .execute ("""
67
+ CREATE TABLE IF NOT EXISTS history (
68
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
69
+ entity_hash INTEGER,
70
+ datetime TEXT,
71
+ state TEXT,
72
+ attributes TEXT,
73
+ system TEXT,
74
+ keep TEXT,
75
+ FOREIGN KEY (entity_hash) REFERENCES latest(entity_hash)
76
+ )
77
+ """ )
78
+
79
+ # Create index on entity_hash for faster history queries
80
+ self .db_cursor .execute ("CREATE INDEX IF NOT EXISTS idx_history_entity_hash ON history(entity_hash)" )
81
+ self .db_cursor .execute ("CREATE INDEX IF NOT EXISTS idx_history_datetime ON history(datetime)" )
82
+
46
83
self .db_cursor .execute (
47
- "DELETE FROM states WHERE datetime < ? AND keep != ?" ,
84
+ "DELETE FROM history WHERE datetime < ? AND keep != ?" ,
48
85
(
49
86
self .base .now_utc_real - timedelta (days = self .db_days ),
50
87
"D" ,
@@ -57,11 +94,9 @@ def _get_state_db(self, entity_id):
57
94
Get entity current state from the SQLLite database
58
95
"""
59
96
entity_id = entity_id .lower ()
60
- entity_index = self ._get_entity_index_db (entity_id )
61
- if not entity_index :
62
- return None
97
+ entity_hash = self ._entity_hash (entity_id )
63
98
64
- self .db_cursor .execute ("SELECT datetime, state, attributes FROM latest WHERE entity_index =?" , (entity_index ,))
99
+ self .db_cursor .execute ("SELECT datetime, state, attributes FROM latest WHERE entity_hash =?" , (entity_hash ,))
65
100
res = self .db_cursor .fetchone ()
66
101
if res :
67
102
state = res [1 ]
@@ -74,40 +109,22 @@ def _get_state_db(self, entity_id):
74
109
else :
75
110
return None
76
111
77
- def _get_entity_index_db (self , entity_id ):
78
- """
79
- Get the entity index from the SQLLite database
80
- """
81
- if entity_id in self .entity_id_cache :
82
- return self .entity_id_cache [entity_id ]
83
-
84
- self .db_cursor .execute ("SELECT entity_index FROM entities WHERE entity_name=?" , (entity_id ,))
85
- res = self .db_cursor .fetchone ()
86
- if res :
87
- self .entity_id_cache [entity_id ] = res [0 ]
88
- return res [0 ]
89
- else :
90
- return None
91
-
92
112
def _get_all_entities_db (self ):
93
113
"""
94
114
Get all entity names from the SQLLite database
95
115
"""
96
- self .db_cursor .execute ("SELECT entity_name FROM entities " )
116
+ self .db_cursor .execute ("SELECT entity_id FROM latest " )
97
117
rows = self .db_cursor .fetchall ()
98
118
return [row [0 ] for row in rows ]
99
119
100
120
def _set_state_db (self , entity_id , state , attributes , timestamp ):
101
121
"""
102
122
Records the state of a predbat entity into the SQLLite database
103
- There is one table per Entity created with history recorded
123
+ Uses hash-based entity IDs for consistent references across instances
104
124
"""
105
125
state = str (state )
106
-
107
- # Put the entity_id into entities table if its not in already
108
- self .db_cursor .execute ("INSERT OR IGNORE INTO entities (entity_name) VALUES (?)" , (entity_id ,))
109
- self .db .commit ()
110
- entity_index = self ._get_entity_index_db (entity_id )
126
+ entity_id = entity_id .lower ()
127
+ entity_hash = self ._entity_hash (entity_id )
111
128
112
129
# Convert time to GMT+0
113
130
now_utc = timestamp
@@ -127,9 +144,8 @@ def _set_state_db(self, entity_id, state, attributes, timestamp):
127
144
attributes_record_full_json = json .dumps (attributes_record_full )
128
145
system_json = json .dumps (system )
129
146
130
- # Record the state of the entity
131
- # If the entity value and attributes are unchanged then don't record the new state
132
- self .db_cursor .execute ("SELECT datetime, state, attributes, system, keep FROM states WHERE entity_index=? ORDER BY datetime DESC LIMIT 1" , (entity_index ,))
147
+ # Check if the entity value and attributes are unchanged
148
+ self .db_cursor .execute ("SELECT datetime, state, attributes, system, keep FROM history WHERE entity_hash=? ORDER BY datetime DESC LIMIT 1" , (entity_hash ,))
133
149
last_record = self .db_cursor .fetchone ()
134
150
keep = "D"
135
151
if last_record :
@@ -150,41 +166,45 @@ def _set_state_db(self, entity_id, state, attributes, timestamp):
150
166
else :
151
167
keep = "H"
152
168
153
- # Insert the new state record
169
+ # Insert the new state record using upsert
154
170
try :
155
- self . db_cursor . execute ( "DELETE FROM states WHERE entity_index = ? AND datetime = ?" , ( entity_index , now_utc_txt ))
171
+ # Upsert into the latest table
156
172
self .db_cursor .execute (
157
- "INSERT INTO states (datetime, entity_index, state, attributes, system, keep) VALUES (?, ?, ?, ?, ?, ?)" ,
173
+ """
174
+ INSERT INTO latest (entity_hash, entity_id, datetime, state, attributes, system, keep)
175
+ VALUES (?, ?, ?, ?, ?, ?, ?)
176
+ ON CONFLICT(entity_hash) DO UPDATE SET
177
+ entity_id=excluded.entity_id,
178
+ datetime=excluded.datetime,
179
+ state=excluded.state,
180
+ attributes=excluded.attributes,
181
+ system=excluded.system,
182
+ keep=excluded.keep
183
+ """ ,
158
184
(
185
+ entity_hash ,
186
+ entity_id ,
159
187
now_utc_txt ,
160
- entity_index ,
161
188
state ,
162
- attributes_record_json ,
189
+ attributes_record_full_json ,
163
190
system_json ,
164
191
keep ,
165
192
),
166
193
)
167
- # Upsert into the latest table (replace if exists, insert if not)
194
+
195
+ # Insert into history table using entity_hash
168
196
self .db_cursor .execute (
169
- """
170
- INSERT INTO latest (entity_index, datetime, state, attributes, system, keep)
171
- VALUES (?, ?, ?, ?, ?, ?)
172
- ON CONFLICT(entity_index) DO UPDATE SET
173
- datetime=excluded.datetime,
174
- state=excluded.state,
175
- attributes=excluded.attributes,
176
- system=excluded.system,
177
- keep=excluded.keep
178
- """ ,
197
+ "INSERT INTO history (entity_hash, datetime, state, attributes, system, keep) VALUES (?, ?, ?, ?, ?, ?)" ,
179
198
(
180
- entity_index ,
199
+ entity_hash ,
181
200
now_utc_txt ,
182
201
state ,
183
- attributes_record_full_json ,
202
+ attributes_record_json ,
184
203
system_json ,
185
204
keep ,
186
205
),
187
206
)
207
+
188
208
self .db .commit ()
189
209
except sqlite3 .IntegrityError :
190
210
self .log ("Warn: SQL Integrity error inserting data for {}" .format (entity_id ))
@@ -194,17 +214,14 @@ def _get_history_db(self, sensor, now, days=30):
194
214
Get the history for a sensor from the SQLLite database.
195
215
"""
196
216
start = now - timedelta (days = days )
197
-
198
- entity_index = self ._get_entity_index_db (sensor )
199
- if not entity_index :
200
- self .log ("Warn: Entity {} does not exist" .format (sensor ))
201
- return None
217
+ sensor = sensor .lower ()
218
+ entity_hash = self ._entity_hash (sensor )
202
219
203
220
# Get the history for the sensor, sorted by datetime
204
221
self .db_cursor .execute (
205
- "SELECT datetime, state, attributes FROM states WHERE entity_index = ? AND datetime >= ? ORDER BY datetime" ,
222
+ "SELECT datetime, state, attributes FROM history WHERE entity_hash = ? AND datetime >= ? ORDER BY datetime" ,
206
223
(
207
- entity_index ,
224
+ entity_hash ,
208
225
start .strftime (TIME_FORMAT_DB ),
209
226
),
210
227
)
@@ -219,4 +236,4 @@ def _get_history_db(self, sensor, now, days=30):
219
236
pass
220
237
221
238
history .append ({"last_updated" : item [0 ] + "Z" , "state" : state , "attributes" : attributes })
222
- return [history ]
239
+ return [history ]
0 commit comments