2023-09-22 20:46:50 +02:00
from modules . firewall . models import Rule
from utils import nftables_int_to_json , ip_parse , ip_family , NFTableManager , nftables_json_to_int
class FiregexHijackRule ( ) :
def __init__ ( self , proto : str , ip_src : str , ip_dst : str , port_src_from : int , port_dst_from : int , port_src_to : int , port_dst_to : int , action : str , target : str , id : int ) :
self . id = id
self . target = target
self . proto = proto
self . ip_src = ip_src
self . ip_dst = ip_dst
self . port_src_from = min ( port_src_from , port_src_to )
self . port_dst_from = min ( port_dst_from , port_dst_to )
self . port_src_to = max ( port_src_from , port_src_to )
self . port_dst_to = max ( port_dst_from , port_dst_to )
self . action = action
def __eq__ ( self , o : object ) - > bool :
if isinstance ( o , FiregexHijackRule ) or isinstance ( o , Rule ) :
return self . action == o . action and self . proto == o . proto and \
ip_parse ( self . ip_src ) == ip_parse ( o . ip_src ) and ip_parse ( self . ip_dst ) == ip_parse ( o . ip_dst ) and \
int ( self . port_src_from ) == int ( o . port_src_from ) and int ( self . port_dst_from ) == int ( o . port_dst_from ) and \
int ( self . port_src_to ) == int ( o . port_src_to ) and int ( self . port_dst_to ) == int ( o . port_dst_to )
return False
class FiregexTables ( NFTableManager ) :
rules_chain_in = " firewall_rules_in "
rules_chain_out = " firewall_rules_out "
def __init__ ( self ) :
super ( ) . __init__ ( [
{ " add " : { " chain " : {
" family " : " inet " ,
" table " : self . table_name ,
" name " : self . rules_chain_in ,
" type " : " filter " ,
" hook " : " prerouting " ,
" prio " : - 300 ,
" policy " : " accept "
} } } ,
{ " add " : { " chain " : {
" family " : " inet " ,
" table " : self . table_name ,
" name " : self . rules_chain_out ,
" type " : " filter " ,
" hook " : " postrouting " ,
" prio " : - 300 ,
" policy " : " accept "
} } } ,
] , [
{ " flush " : { " chain " : { " table " : self . table_name , " family " : " inet " , " name " : self . rules_chain_in } } } ,
{ " delete " : { " chain " : { " table " : self . table_name , " family " : " inet " , " name " : self . rules_chain_in } } } ,
{ " flush " : { " chain " : { " table " : self . table_name , " family " : " inet " , " name " : self . rules_chain_out } } } ,
{ " delete " : { " chain " : { " table " : self . table_name , " family " : " inet " , " name " : self . rules_chain_out } } } ,
] )
def delete_all ( self ) :
self . cmd (
{ " flush " : { " chain " : { " table " : self . table_name , " family " : " inet " , " name " : self . rules_chain_in } } } ,
{ " flush " : { " chain " : { " table " : self . table_name , " family " : " inet " , " name " : self . rules_chain_out } } } ,
)
def set ( self , srv : list [ Rule ] ) :
self . delete_all ( )
for ele in srv : self . add ( ele )
def add ( self , srv : Rule ) :
port_filters = [ ]
if srv . proto != " any " :
if srv . port_src_from != 1 or srv . port_src_to != 65535 : #Any Port
port_filters . append ( { ' match ' : { ' left ' : { ' payload ' : { ' protocol ' : str ( srv . proto ) , ' field ' : ' sport ' } } , ' op ' : ' >= ' , ' right ' : int ( srv . port_src_from ) } } )
port_filters . append ( { ' match ' : { ' left ' : { ' payload ' : { ' protocol ' : str ( srv . proto ) , ' field ' : ' sport ' } } , ' op ' : ' <= ' , ' right ' : int ( srv . port_src_to ) } } )
if srv . port_dst_from != 1 or srv . port_dst_to != 65535 : #Any Port
port_filters . append ( { ' match ' : { ' left ' : { ' payload ' : { ' protocol ' : str ( srv . proto ) , ' field ' : ' dport ' } } , ' op ' : ' >= ' , ' right ' : int ( srv . port_dst_from ) } } )
port_filters . append ( { ' match ' : { ' left ' : { ' payload ' : { ' protocol ' : str ( srv . proto ) , ' field ' : ' dport ' } } , ' op ' : ' <= ' , ' right ' : int ( srv . port_dst_to ) } } )
if len ( port_filters ) == 0 :
port_filters . append ( { ' match ' : { ' left ' : { ' payload ' : { ' protocol ' : str ( srv . proto ) , ' field ' : ' sport ' } } , ' op ' : ' != ' , ' right ' : 0 } } ) #filter the protocol if no port is specified
self . cmd ( { " insert " : { " rule " : {
" family " : " inet " ,
" table " : self . table_name ,
" chain " : self . rules_chain_out if srv . output_mode else self . rules_chain_in ,
" expr " : [
{ ' match ' : { ' left ' : { ' payload ' : { ' protocol ' : ip_family ( srv . ip_src ) , ' field ' : ' saddr ' } } , ' op ' : ' == ' , ' right ' : nftables_int_to_json ( srv . ip_src ) } } ,
{ ' match ' : { ' left ' : { ' payload ' : { ' protocol ' : ip_family ( srv . ip_dst ) , ' field ' : ' daddr ' } } , ' op ' : ' == ' , ' right ' : nftables_int_to_json ( srv . ip_dst ) } } ,
2023-09-22 20:56:58 +02:00
] + port_filters +
[ { ' accept ' : None } if srv . action == " accept " else { ' reject ' : { } } if ( srv . action == " reject " and not srv . output_mode ) else { ' drop ' : None } ]
#If srv.output_mode is True, then the rule is in the output chain, so the reject action is not allowed
2023-09-22 20:46:50 +02:00
} } } )