How to Manage Thousands of Lines of Firewall Rules, Using Juniper SRX as an Example

There is special software like Algosec or Tufin, but a simple Python program can do almost the same thing.

Serious problems for large firewalls can be duplicate rules, shadowed rules, and groups of rules that can be combined.

I have prepared a simplified configuration for SRX:

set security policies global policy gl1 match source-address og1
set security policies global policy gl1 match destination-address og12
set security policies global policy gl1 match application junos-ssh
set security policies global policy gl1 then permit
set security policies global policy gl2 match source-address og1
set security policies global policy gl2 match source-address og3
set security policies global policy gl2 match source-address on1
set security policies global policy gl2 match destination-address og12
set security policies global policy gl2 match destination-address og4
set security policies global policy gl2 match destination-address on2
set security policies global policy gl2 match application junos-http
set security policies global policy gl2 then permit
set security policies global policy glpol1 match source-address app_c_b
set security policies global policy glpol1 match destination-address og3
set security policies global policy glpol1 match application junos-https
set security policies global policy glpol1 then permit
set security policies global policy glt1 match source-address og1_5_6
set security policies global policy glt1 match destination-address og1_7_8
set security policies global policy glt1 match application junos-ssh
set security policies global policy glt1 then permit
set security policies global policy glt2 match source-address og1_5_6_cl
set security policies global policy glt2 match destination-address og1_7_8
set security policies global policy glt2 match application junos-ssh
set security policies global policy glt2 then permit
set security policies global policy glt3 match source-address og5_6
set security policies global policy glt3 match destination-address og7_8
set security policies global policy glt3 match application junos-ssh
set security policies global policy glt3 then permit

set security policies global policy glt4 match source-address og5_6
set security policies global policy glt4 match destination-address og7_8
set security policies global policy glt4 match destination-address app_a_b
set security policies global policy glt4 match application junos-ssh
set security policies global policy glt4 then permit

set security policies global policy glt5 match source-address og5_6
set security policies global policy glt5 match source-address app_c_b
set security policies global policy glt5 match destination-address og7_8
set security policies global policy glt5 match destination-address app_a_b
set security policies global policy glt5 match application junos-ssh
set security policies global policy glt5 then permit

set security policies global policy glt2cl match source-address og1_5_6_cl
set security policies global policy glt2cl match destination-address og1_7_8
set security policies global policy glt2cl match application junos-ssh
set security policies global policy glt2cl then permit

set security policies global policy glt4ag match source-address og5_6
set security policies global policy glt4ag match destination-address og7_8
set security policies global policy glt4ag match destination-address app_a_b
set security policies global policy glt4ag match application junos-https
set security policies global policy glt4ag then permit

python program reads and converts to csv file

gl1;['og1'];['og12'];['junos-ssh']
gl2;['og1', 'og3', 'on1'];['og12', 'og4', 'on2'];['junos-http']
glpol1;['app_c_b'];['og3'];['junos-https']
glt1;['og1_5_6'];['og1_7_8'];['junos-ssh']
glt2;['og1_5_6_cl'];['og1_7_8'];['junos-ssh']
glt3;['og5_6'];['og7_8'];['junos-ssh']
glt4;['og5_6'];['app_a_b', 'og7_8'];['junos-ssh']

glt5;['app_c_b', 'og5_6'];['app_a_b', 'og7_8'];['junos-ssh']
glt2cl;['og1_5_6_cl'];['og1_7_8'];['junos-ssh']
glt4ag;['og5_6'];['app_a_b', 'og7_8'];['junos-https']

The first column is the name, the second is a python list with source address objects, the third is the destination, and the fourth is the application.

the second program either searches for a complete match of objects, which means a repeating rule, meaning one of them can be deleted, or searches for rules where the objects are a subset of the objects of another rule, which means a shadow rule, or searches for rules where two groups of objects match, meaning two rules can be grouped by the third column.

Next python

#!/usr/bin/python3
#  usage " python srx_policy_to_csv fw_name "                  ------- SRX FW to create CSV file with policies

import csv, sys
from sys import argv

args = sys.argv                                           # Set the input and output file names
input_file   = args[1] +'.conf'                           # "juniper_srx_policies  .csv"
output_file  = args[1] + '_all.csv'                       # "_all.csv"

csv_list = []
# Open the input and output files
with open(input_file, "r") as f, open(output_file, "w", newline="") as out_file:
    reader = csv.reader(f, delimiter=" ")
    writer = csv.writer(out_file, delimiter=";")          # semicolon delimiter

    policy_name=""
    src_list , dst_list, app_list = [] , [] , []
    for row in reader:                                    # Loop over each row in the input file
        rrr = row
        if row == []:
            continue
        if not (row[0] == "set"):
            continue
        if ((row[0] == "set") and (row[1] == "security") and (row[2] == "policies") and ("policy" in row)):
            if ((policy_name ==  row[(row.index('policy')+1)])):
#                print(row)
                if ("source-address" in row):
                    src_list.append( row[(row.index('source-address')+1)] )
                if ("destination-address" in row):
                    dst_list.append( row[(row.index('destination-address')+1)] )
                if ("application" in row):
                    app_list.append( row[(row.index('application')+1)] )            
            else:
                src_list.sort()
                dst_list.sort()
                app_list.sort()
                outstr = policy_name+','+ str(src_list)+','+str(dst_list)+','+str(app_list)
                if not policy_name == '':
                    csv_list.append(outstr)
                    writer.writerow([policy_name, str(src_list), str(dst_list), str(app_list)]) 
#                print( ' added ',outstr, ' to ', csv_list)
                policy_name =  row[(row.index('policy')+1)]
                src_list , dst_list, app_list = [] , [] , []
                if ("source-address" in row):
                    src_list.append( row[(row.index('source-address')+1)] )
                if ("destination-address" in row):
                    dst_list.append( row[(row.index('destination-address')+1)] )
                if ("application" in row):
                    app_list.append( row[(row.index('application')+1)] )

                
    src_list.sort()
    dst_list.sort()
    app_list.sort()
                    
    outstr = policy_name+','+ str(src_list)+','+str(dst_list)+','+str(app_list)
    csv_list.append(outstr)
    writer.writerow([policy_name, str(src_list), str(dst_list), str(app_list)]) 
    print('      ---------  ') 
    print(csv_list) 
    print('      ---------  ')           

and the second

#!/usr/bin/python3
#  usage " python shadow.py  fw_name"           --- search SRX duplicate shadow rules       file_path="conf _all.csv"

import csv, sys, re, ast, ipaddress, pandas as pd
from sys import argv

def c_s_t_l(string):              # convert a string that looks like a list to an actual list
    try:                                         # convert_string_to_list(string): 
        return ast.literal_eval(string)          # Return list
    except (ValueError, SyntaxError):
        return string                            # Return the original string if it's not a list

##############  main

args = sys.argv                                         #  Set the input and output file names
file_path = args[1]+'_all.csv'                          #  read  " juniper_srx   policies      .csv"
textfile  = open(file_path, "r")
textf     = textfile.read()

d_output_file = args[1] +'_dup_source_dest.csv'         #  write "         _dup.csv"
f_output_file = args[1] +'_dup_full.csv'                #  write "         _dup.csv"
s_output_file = args[1] +'_sha.csv'                     #  write "         _sha.csv"
dtextfile  = open(d_output_file, "w")
ftextfile  = open(f_output_file, "w")
stextfile  = open(s_output_file, "w")

nlines  = textf.strip().splitlines()
nlines1 = nlines
c1, c2, c3 = 0 , 0 , 0
for fline in nlines:
    row = fline.split(';')                         # Split each line by (';') 
    for fline1 in nlines1:
        row1 = fline1.split(';')                   # Split each line by (';') 
        if row[0] == row1[0]:
            continue

        if ((c_s_t_l(row[1]) == (c_s_t_l(row1[1]))) and ((c_s_t_l(row[2])) == (c_s_t_l(row1[2])))):        #    find duplicate
            if (c_s_t_l(row[3]) == (c_s_t_l(row1[3]))):
                c1 = c1 + 1
                print('          ----------- ',c1 , file=ftextfile)
                print(row  , file=ftextfile)
                print(row1 , file=ftextfile)
                continue                
            else:
                c2 = c2 + 1
                print('          ----------- ',c2 , file=dtextfile)
                print(row  , file=dtextfile)
                print(row1 , file=dtextfile)   
                continue 
        if (set(c_s_t_l(row[1])).issubset(c_s_t_l(row1[1])) and set(c_s_t_l(row[2])).issubset(c_s_t_l(row1[2])) and set(c_s_t_l(row[3])).issubset(c_s_t_l(row1[3]))):
            c3 = c3 + 1
            print('          ----------- ',c3 , file=stextfile)
            print(row  , file=stextfile)
            print(row1 , file=stextfile)   
 #           continue 

Please, any comments and questions are welcome.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *