Files
firegex-traffic-viewer/backend/utils.py

270 lines
9.7 KiB
Python
Raw Normal View History

from hashlib import md5
2022-07-08 11:35:12 +02:00
import traceback
from typing import Dict
2022-06-13 18:44:11 +02:00
from proxy import Filter, Proxy
import os, sqlite3, socket, asyncio, re
import secrets, json
2022-06-13 18:44:11 +02:00
from base64 import b64decode
LOCALHOST_IP = socket.gethostbyname(os.getenv("LOCALHOST_IP","127.0.0.1"))
2022-06-13 18:44:11 +02:00
class SQLite():
def __init__(self, db_name) -> None:
self.conn = None
self.cur = None
self.db_name = db_name
self.schema = {
'services': {
'service_id': 'VARCHAR(100) PRIMARY KEY',
'status': 'VARCHAR(100) NOT NULL',
'port': 'INT NOT NULL CHECK(port > 0 and port < 65536)',
'name': 'VARCHAR(100) NOT NULL UNIQUE',
'ipv6': 'BOOLEAN NOT NULL CHECK (ipv6 IN (0, 1)) DEFAULT 0',
'proto': 'VARCHAR(3) NOT NULL CHECK (proto IN ("tcp", "udp"))',
'ip_int': 'VARCHAR(100) NOT NULL',
},
'regexes': {
'regex': 'TEXT NOT NULL',
'mode': 'VARCHAR(1) NOT NULL',
'service_id': 'VARCHAR(100) NOT NULL',
'is_blacklist': 'BOOLEAN NOT NULL CHECK (is_blacklist IN (0, 1))',
'blocked_packets': 'INTEGER UNSIGNED NOT NULL DEFAULT 0',
'regex_id': 'INTEGER PRIMARY KEY',
'is_case_sensitive' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1))',
'active' : 'BOOLEAN NOT NULL CHECK (active IN (0, 1)) DEFAULT 1',
'FOREIGN KEY (service_id)':'REFERENCES services (service_id)',
},
'keys_values': {
'key': 'VARCHAR(100) PRIMARY KEY',
'value': 'VARCHAR(100) NOT NULL',
},
'QUERY':[
"CREATE UNIQUE INDEX IF NOT EXISTS unique_services ON services (ipv6, port, ip_int, proto);",
"CREATE UNIQUE INDEX IF NOT EXISTS unique_regex_service ON regexes (regex,service_id,is_blacklist,mode,is_case_sensitive);"
]
}
self.DB_VER = md5(json.dumps(self.schema).encode()).hexdigest()
2022-06-13 18:44:11 +02:00
def connect(self) -> None:
try:
2022-06-28 16:02:52 +02:00
self.conn = sqlite3.connect(self.db_name, check_same_thread = False)
2022-06-13 18:44:11 +02:00
except Exception:
2022-06-28 16:02:52 +02:00
with open(self.db_name, 'x'):
2022-06-13 18:44:11 +02:00
pass
2022-06-28 16:02:52 +02:00
self.conn = sqlite3.connect(self.db_name, check_same_thread = False)
2022-06-28 13:26:06 +02:00
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
self.conn.row_factory = dict_factory
2022-06-13 18:44:11 +02:00
def disconnect(self) -> None:
if self.conn: self.conn.close()
2022-06-13 18:44:11 +02:00
2022-06-28 13:36:17 +02:00
def create_schema(self, tables = {}) -> None:
2022-06-13 18:44:11 +02:00
cur = self.conn.cursor()
for t in tables:
if t == "QUERY": continue
cur.execute('CREATE TABLE IF NOT EXISTS main.{}({});'.format(t, ''.join([(c + ' ' + tables[t][c] + ', ') for c in tables[t]])[:-2]))
if "QUERY" in tables: [cur.execute(qry) for qry in tables["QUERY"]]
2022-06-13 18:44:11 +02:00
cur.close()
2022-06-28 13:26:06 +02:00
def query(self, query, *values):
2022-06-13 18:44:11 +02:00
cur = self.conn.cursor()
try:
2022-07-08 11:35:12 +02:00
cur.execute(query, values)
return cur.fetchall()
2022-06-13 18:44:11 +02:00
finally:
cur.close()
try: self.conn.commit()
except Exception: pass
def delete(self):
self.disconnect()
os.remove(self.db_name)
def init(self):
self.connect()
try:
current_ver = self.query("SELECT value FROM keys_values WHERE key = 'DB_VERSION'")[0]['value']
if current_ver != self.DB_VER: raise Exception("DB_VERSION is not correct")
except Exception:
self.delete()
self.connect()
self.create_schema(self.schema)
self.query("INSERT INTO keys_values (key, value) VALUES ('DB_VERSION', ?)", self.DB_VER)
2022-06-13 18:44:11 +02:00
class KeyValueStorage:
def __init__(self, db):
self.db = db
def get(self, key):
2022-06-28 13:26:06 +02:00
q = self.db.query('SELECT value FROM keys_values WHERE key = ?', key)
2022-06-13 18:44:11 +02:00
if len(q) == 0:
return None
else:
2022-06-28 13:26:06 +02:00
return q[0]["value"]
2022-06-13 18:44:11 +02:00
def put(self, key, value):
if self.get(key) is None:
2022-06-28 13:26:06 +02:00
self.db.query('INSERT INTO keys_values (key, value) VALUES (?, ?);', key, str(value))
2022-06-13 18:44:11 +02:00
else:
2022-06-28 13:26:06 +02:00
self.db.query('UPDATE keys_values SET value=? WHERE key = ?;', str(value), key)
2022-06-13 18:44:11 +02:00
class STATUS:
2022-07-07 21:56:34 +02:00
STOP = "stop"
2022-06-13 18:44:11 +02:00
ACTIVE = "active"
class ServiceNotFoundException(Exception): pass
2022-06-13 18:44:11 +02:00
2022-06-28 19:38:28 +02:00
class ServiceManager:
def __init__(self, srv, db):
self.srv = srv
2022-06-28 19:38:28 +02:00
self.db = db
self.proxy = Proxy(srv)
2022-07-07 21:56:34 +02:00
self.status = STATUS.STOP
2022-06-28 19:38:28 +02:00
self.filters = {}
self._update_filters_from_db()
2022-06-28 19:38:28 +02:00
self.lock = asyncio.Lock()
self.starter = None
2022-06-28 19:38:28 +02:00
def _update_filters_from_db(self):
res = self.db.query("""
2022-06-28 13:26:06 +02:00
SELECT
regex, mode, regex_id id, is_blacklist,
2022-06-28 13:26:06 +02:00
blocked_packets n_packets, is_case_sensitive
2022-07-10 15:05:56 +02:00
FROM regexes WHERE service_id = ? AND active=1;
""", self.srv["service_id"])
2022-06-13 18:44:11 +02:00
2022-06-28 19:38:28 +02:00
#Filter check
old_filters = set(self.filters.keys())
new_filters = set([f["id"] for f in res])
#remove old filters
for f in old_filters:
if not f in new_filters:
del self.filters[f]
for f in new_filters:
if not f in old_filters:
filter_info = [ele for ele in res if ele["id"] == f][0]
self.filters[f] = Filter(
is_case_sensitive=filter_info["is_case_sensitive"],
c_to_s=filter_info["mode"] in ["C","B"],
s_to_c=filter_info["mode"] in ["S","B"],
is_blacklist=filter_info["is_blacklist"],
regex=b64decode(filter_info["regex"]),
blocked_packets=filter_info["n_packets"],
code=f
)
2022-07-08 11:35:12 +02:00
self.proxy.set_filters(self.filters.values())
2022-06-13 18:44:11 +02:00
def __update_status_db(self, status):
self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", status, self.srv["service_id"])
2022-06-13 18:44:11 +02:00
2022-06-28 19:38:28 +02:00
async def next(self,to):
2022-06-30 15:58:03 +02:00
async with self.lock:
2022-07-07 21:56:34 +02:00
return self._next(to)
2022-06-30 15:58:03 +02:00
2022-07-07 21:56:34 +02:00
def _next(self, to):
2022-06-30 15:58:03 +02:00
if self.status != to:
# ACTIVE -> PAUSE
2022-07-07 21:56:34 +02:00
if (self.status, to) in [(STATUS.ACTIVE, STATUS.STOP)]:
self.proxy.stop()
2022-06-30 15:58:03 +02:00
self._set_status(to)
# PAUSE -> ACTIVE
2022-07-07 21:56:34 +02:00
elif (self.status, to) in [(STATUS.STOP, STATUS.ACTIVE)]:
self.proxy.restart()
2022-06-30 15:58:03 +02:00
self._set_status(to)
2022-06-28 19:38:28 +02:00
def _stats_updater(self,filter:Filter):
self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", filter.blocked, filter.code)
2022-07-08 11:35:12 +02:00
def update_stats(self):
for ele in self.proxy.filters:
self._stats_updater(ele)
2022-06-28 19:38:28 +02:00
def _set_status(self,status):
self.status = status
self.__update_status_db(status)
2022-06-28 19:38:28 +02:00
async def update_filters(self):
async with self.lock:
self._update_filters_from_db()
2022-06-13 18:44:11 +02:00
2022-06-28 19:38:28 +02:00
class ProxyManager:
def __init__(self, db:SQLite):
self.db = db
2022-07-08 11:35:12 +02:00
self.proxy_table: Dict[ServiceManager] = {}
2022-06-28 19:38:28 +02:00
self.lock = asyncio.Lock()
2022-07-08 12:04:52 +02:00
self.updater_task = None
2022-07-08 11:35:12 +02:00
2022-07-08 15:13:46 +02:00
def init_updater(self, callback = None):
2022-07-08 12:04:52 +02:00
if not self.updater_task:
2022-07-08 15:13:46 +02:00
self.updater_task = asyncio.create_task(self._stats_updater(callback))
2022-07-08 13:13:30 +02:00
def close_updater(self):
if self.updater_task: self.updater_task.cancel()
2022-06-28 19:38:28 +02:00
async def close(self):
2022-07-08 13:13:30 +02:00
self.close_updater()
if self.updater_task: self.updater_task.cancel()
2022-06-28 19:38:28 +02:00
for key in list(self.proxy_table.keys()):
await self.remove(key)
2022-07-10 15:05:56 +02:00
async def remove(self,srv_id):
2022-06-28 19:38:28 +02:00
async with self.lock:
2022-07-10 15:05:56 +02:00
if srv_id in self.proxy_table:
await self.proxy_table[srv_id].next(STATUS.STOP)
del self.proxy_table[srv_id]
2022-06-13 18:44:11 +02:00
2022-07-08 15:13:46 +02:00
async def init(self, callback = None):
self.init_updater(callback)
2022-07-08 13:13:30 +02:00
await self.reload()
2022-06-28 19:38:28 +02:00
async def reload(self):
async with self.lock:
for srv in self.db.query('SELECT * FROM services;'):
srv_id = srv["service_id"]
if srv_id in self.proxy_table:
2022-06-28 19:38:28 +02:00
continue
2022-06-13 18:44:11 +02:00
self.proxy_table[srv_id] = ServiceManager(srv, self.db)
await self.proxy_table[srv_id].next(srv["status"])
2022-06-18 13:05:59 +02:00
2022-07-08 15:13:46 +02:00
async def _stats_updater(self, callback):
2022-07-08 13:13:30 +02:00
try:
while True:
try:
for key in list(self.proxy_table.keys()):
self.proxy_table[key].update_stats()
except Exception:
traceback.print_exc()
2022-07-08 15:13:46 +02:00
if callback:
if asyncio.iscoroutinefunction(callback): await callback()
else: callback()
await asyncio.sleep(5)
2022-07-08 13:13:30 +02:00
except asyncio.CancelledError:
self.updater_task = None
return
2022-07-10 15:05:56 +02:00
def get(self,srv_id):
if srv_id in self.proxy_table:
return self.proxy_table[srv_id]
else:
2022-07-10 15:05:56 +02:00
raise ServiceNotFoundException()
def refactor_name(name:str):
name = name.strip()
while " " in name: name = name.replace(" "," ")
return name
def gen_service_id(db):
while True:
res = secrets.token_hex(8)
if len(db.query('SELECT 1 FROM services WHERE service_id = ?;', res)) == 0:
break
return res