Skip to content

feat: CSV dumping (Claude's version) #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 77 additions & 13 deletions iptables-mon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import argparse
import time
import curses
import csv
import os
from datetime import datetime

def get_rules_with_counters(chain):
result = subprocess.run(['iptables', '-S', chain, '-v'],
result = subprocess.run(['iptables', '-S', chain, '-v', '-t', 'mangle'],
capture_output=True, text=True)
lines = result.stdout.strip().splitlines()
rules = []
Expand Down Expand Up @@ -52,51 +55,93 @@ def main(stdscr, args, rule_command):
# Clear and setup screen
stdscr.clear()
# Hide cursor
curses.curs_set(0)
curses.curs_set(0)

stdscr.addstr(0, 0, f"Monitoring chain '{args.chain}', rule index {args.number}, refresh rate {args.refresh}s")
stdscr.addstr(1, 0, f"Rule command: {rule_command}")
if args.file:
stdscr.addstr(2, 0, f"CSV logging to: {args.file}")
display_row = "Throughput: "
# stdscr.addstr(3, 0, f"{display_row}")
row = 3
row = 4 if args.file else 3

initial_packets, initial_bytes, _ = get_counters(args.chain, args.number)
if initial_bytes is None:
stdscr.addstr(5, 0, "Failed to retrieve counters.")
stdscr.addstr(row + 2, 0, "Failed to retrieve counters.")
stdscr.refresh()
time.sleep(2)
return

# start from 0 for this measurement sessions
# Initialize CSV file if specified
csv_file = None
csv_writer = None
if args.file:
try:
csv_file = open(args.file, 'w', newline='')
csv_writer = csv.writer(csv_file)
# Write CSV header
csv_writer.writerow(['timestamp', 'throughput_bps', 'total_bytes', 'packets_diff', 'total_packets'])
csv_file.flush() # Ensure header is written immediately
except IOError as e:
stdscr.addstr(row + 2, 0, f"Error creating CSV file: {e}")
stdscr.refresh()
time.sleep(2)
return

# Start from 0 for this measurement session
total_bytes = 0
total_packets = 0
initial_packets_count = initial_packets

try:
while True:
while True:
packets, bytes_count, _ = get_counters(args.chain, args.number)
if bytes_count is None:
continue

# Calculate bytes difference
# Calculate differences
bytes_diff = bytes_count - initial_bytes
packets_diff = packets - initial_packets

# Update total bytes
# Update totals
total_bytes += bytes_diff
human_readable_tot_amount = human_readable_number(total_bytes, "B")
total_packets += packets_diff

# Calculate throughput in bytes per second
bytes_per_sec = bytes_diff / args.refresh

# Convert bytes difference to bits/sec
bits_per_sec = (bytes_diff * 8) / args.refresh
# Convert to bits/sec for display
bits_per_sec = bytes_per_sec * 8
human_readable_throughput = human_readable_number(bits_per_sec, "bps")
human_readable_tot_amount = human_readable_number(total_bytes, "B")

# Display throughput and total bytes
stdscr.addstr(row, 0, f"{display_row}{human_readable_throughput} | Total Bytes: {human_readable_tot_amount} ")
stdscr.refresh()

# Update initial_bytes for next iteration
# Write to CSV if enabled
if csv_writer:
timestamp = datetime.now().isoformat()
csv_writer.writerow([
timestamp,
bytes_per_sec, # Throughput in Bps (raw)
total_bytes, # Total bytes (raw)
packets_diff, # Packets difference
total_packets # Total packets
])
csv_file.flush() # Ensure data is written immediately

# Update for next iteration
initial_bytes = bytes_count
initial_packets = packets

time.sleep(args.refresh)

except KeyboardInterrupt:
pass
finally:
# Close CSV file if it was opened
if csv_file:
csv_file.close()

if __name__ == "__main__":
# Argument parsing outside curses to handle help display properly
Expand All @@ -107,13 +152,30 @@ def main(stdscr, args, rule_command):
help='Rule number (index) in the chain to monitor.')
parser.add_argument('-r', '--refresh', type=float, default=1.0,
help='Refresh interval in seconds.')
parser.add_argument('-f', '--file', type=str,
help='CSV file path for logging measurements (created in current directory).')
args = parser.parse_args()

# Validate refresh interval
if args.refresh <= 0:
print("Error: Refresh interval must be greater than zero.")
sys.exit(1)

# Validate CSV file path if provided
if args.file:
# Ensure we're writing to current directory
csv_path = os.path.basename(args.file)
if csv_path != args.file:
print(f"Warning: File will be created in current directory as '{csv_path}'")
args.file = csv_path

# Check if file already exists
if os.path.exists(args.file):
response = input(f"File '{args.file}' already exists. Overwrite? (y/N): ")
if response.lower() != 'y':
print("Aborted.")
sys.exit(1)

# Verify that rule number is valid before starting curses
rules = get_rules_with_counters(args.chain)
if not rules:
Expand All @@ -127,6 +189,8 @@ def main(stdscr, args, rule_command):
# Get the specific rule command
rule_command = rules[args.number - 1]['line']
print(f"Selected Rule {args.number}: {rule_command}")
if args.file:
print(f"CSV logging enabled: {args.file}")

# Run curses wrapper with args and rule_command
curses.wrapper(main, args, rule_command)