commit 3a14e9be0999fa354fc632250f888603619fa0b0 Author: Nik Rozman Date: Fri Oct 27 17:17:20 2023 +0200 Initial commit diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ce9ccf6 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module sflow-abuse + +go 1.20 diff --git a/ignored.txt b/ignored.txt new file mode 100644 index 0000000..757d890 --- /dev/null +++ b/ignored.txt @@ -0,0 +1 @@ +192.168.1.191/32 \ No newline at end of file diff --git a/main.go b/main.go new file mode 100644 index 0000000..c5509e8 --- /dev/null +++ b/main.go @@ -0,0 +1,127 @@ +package main + +import ( + "bufio" + "encoding/csv" + "fmt" + "net" + "os" + "sflow-abuse/src" + "syscall" + "time" +) + +// Function to check if an IP address is within any of the specified subnets. +func isIPInSubnets(ip string, subnets []string, ignored []string) bool { + addr := net.ParseIP(ip) + if addr == nil { + return false + } + + for _, subnetStr := range subnets { + _, subnet, err := net.ParseCIDR(subnetStr) + if err != nil { + fmt.Printf("Error parsing subnet %s: %v\n", subnetStr, err) + continue + } + + if subnet.Contains(addr) && !isIPInSubnets(ip, ignored, []string{}) { + return true + } + } + + return false +} + +func main() { + // Define the path for the named pipe (FIFO). + pipePath := "/tmp/sflow-abuse" + + // Create the named pipe (FIFO) if it doesn't exist. + if _, err := os.Stat(pipePath); os.IsNotExist(err) { + if err := syscall.Mkfifo(pipePath, 0666); err != nil { + fmt.Printf("Error creating named pipe: %v\n", err) + return + } + } + + fmt.Printf("Listening on named pipe (FIFO): %s\n", pipePath) + + // Read subnets from a file (one subnet per line). + subnetsFile, err := os.Open("subnets.txt") + if err != nil { + fmt.Printf("Error opening subnets file: %v\n", err) + return + } + defer subnetsFile.Close() + + var subnets []string + scanner := bufio.NewScanner(subnetsFile) + for scanner.Scan() { + subnets = append(subnets, scanner.Text()) + } + + // Read ignored subnets from a file (one subnet per line). + ignoredFile, err := os.Open("ignored.txt") + if err != nil { + fmt.Printf("Error opening ignored file: %v\n", err) + return + } + defer ignoredFile.Close() + + var ignored []string + scanner = bufio.NewScanner(ignoredFile) + for scanner.Scan() { + ignored = append(ignored, scanner.Text()) + } + + // Print the subnets that will be used for filtering. + fmt.Printf("Loaded %d subnets: %v\n", len(subnets), subnets) + fmt.Printf("Loaded %d ignored subnets: %v\n", len(ignored), ignored) + + // Create an instance of EventCounter to track events. + eventCounter := src.NewEventCounter(1*time.Minute, src.HandleAlert) + go eventCounter.StartMonitoring() + + // Open the named pipe for reading. + pipeFile, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) + if err != nil { + fmt.Printf("Error opening named pipe: %v\n", err) + return + } + defer pipeFile.Close() + + // Create a CSV reader to read data from the named pipe. + csvReader := csv.NewReader(pipeFile) + + // Process each row of CSV data. + for { + row, err := csvReader.Read() + if err != nil { + if err.Error() == "EOF" { + fmt.Println("Reached EOF, exiting.") + break + } else { + // CSV is malformed, skip the line. + continue + } + } + + // Ensure the row contains at least six columns (source IP, destination IP, and destination port). + if len(row) >= 16 { + if net.ParseIP(row[9]) != nil { + sourceIP := row[9] + // Check if the source IP is within any of the specified subnets. + if isIPInSubnets(sourceIP, subnets, ignored) { + destinationPort := row[15] + + // Count the event for the source IP and port combination. + eventCounter.CountEvent(sourceIP, destinationPort) + } + } + } + } + + // Stop the event monitoring when finished. + eventCounter.StopMonitoring() +} diff --git a/src/event_counter.go b/src/event_counter.go new file mode 100644 index 0000000..e6131c4 --- /dev/null +++ b/src/event_counter.go @@ -0,0 +1,72 @@ +package src + +import ( + "fmt" + "strings" + "sync" + "time" +) + +// EventCounter is a structure to keep track of events. +type EventCounter struct { + counts map[string]int + mutex sync.Mutex + resetTimer *time.Ticker + resetPeriod time.Duration + alertHandler func(ip, port string, count int) +} + +// NewEventCounter creates a new EventCounter instance. +func NewEventCounter(resetPeriod time.Duration, alertHandler func(ip, port string, count int)) *EventCounter { + return &EventCounter{ + counts: make(map[string]int), + resetTimer: time.NewTicker(resetPeriod), + resetPeriod: resetPeriod, + alertHandler: alertHandler, + } +} + +// CountEvent increments the count for a specific IP and port combination. +func (ec *EventCounter) CountEvent(ip, port string) { + key := ip + ":" + port + ec.mutex.Lock() + ec.counts[key]++ + ec.mutex.Unlock() +} + +// StartMonitoring starts the event monitoring and alerts. +func (ec *EventCounter) StartMonitoring() { + for range ec.resetTimer.C { + ec.mutex.Lock() + for key, count := range ec.counts { + if count >= 100 { + ip, port := parseKey(key) + ec.alertHandler(ip, port, count) + } + } + ec.resetCounts() + ec.mutex.Unlock() + } +} + +func parseKey(key string) (string, string) { + parts := strings.Split(key, ":") + if len(parts) == 2 { + return parts[0], parts[1] + } + return "", "" +} + +func (ec *EventCounter) resetCounts() { + ec.counts = make(map[string]int) +} + +// StopMonitoring stops the event monitoring. +func (ec *EventCounter) StopMonitoring() { + ec.resetTimer.Stop() +} + +// HandleAlert is a placeholder for alert handling logic. +func HandleAlert(ip, port string, count int) { + fmt.Printf("Alert: Source IP %s, Port %s exceeded the threshold with a count of %d\n", ip, port, count) +} diff --git a/subnets.txt b/subnets.txt new file mode 100644 index 0000000..dbee727 --- /dev/null +++ b/subnets.txt @@ -0,0 +1,2 @@ +212.192.29.0/29 +192.168.0.0/23 \ No newline at end of file