diff --git a/collector/codec.go b/collector/codec.go new file mode 100644 index 0000000..4644d8f --- /dev/null +++ b/collector/codec.go @@ -0,0 +1,305 @@ +package collector + +import ( + "encoding/binary" + "fmt" + "io" + "os" + + "github.com/buger/jsonparser" + "github.com/klauspost/compress/zstd" + log "github.com/openmesh-network/core/internal/logger" + "github.com/pierrec/lz4" + "go.mongodb.org/mongo-driver/bson" +) + +// If using json parser, use this func to dettermine datatypes etc. +func HandleKeyValueType(key, value []byte, dataType jsonparser.ValueType, offset int) error { + fmt.Printf("Key: %s, Value: %s, Type: %v\n", string(key), string(value), dataType) + return nil +} + +func SetupZstCompressionFile(destFilePath string) (*os.File, *zstd.Encoder, error) { + var err error + + var encoder *zstd.Encoder + var file *os.File + + file, err = os.OpenFile(destFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, nil, err + } + + // Create a new encoder writing to file, with cleanup to close the file if error. + encoder, err = zstd.NewWriter(file) + if err != nil { + file.Close() + return nil, nil, err + } + + return file, encoder, nil +} + +func SetupLz4CompressionFile(destFilePath string) (*os.File, *lz4.Writer, error) { + file, err := os.OpenFile(destFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, nil, err + } + + encoder := lz4.NewWriter(file) + + return file, encoder, nil +} + +func CompressBSONFileLZ4(sourceFilePath, destFilePath string, chunkSize int) error { + sourceFile, err := os.Open(sourceFilePath) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.Create(destFilePath) + if err != nil { + return err + } + defer destFile.Close() + + encoder := lz4.NewWriter(destFile) + defer encoder.Close() + + buf := make([]byte, chunkSize) + for { + n, err := sourceFile.Read(buf) + if err != nil { + if err == io.EOF { + break + } + return err + } + if n > 0 { + if _, err := encoder.Write(buf[:n]); err != nil { + return err + } + } + } + return nil +} + +func CompressBSONFileZst(sourceFilePath, destFilePath string, chunkSize int) error { + sourceFile, err := os.Open(sourceFilePath) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.Create(destFilePath) + if err != nil { + return err + } + defer destFile.Close() + + encoder, err := zstd.NewWriter(destFile) + if err != nil { + return err + } + defer encoder.Close() + + buf := make([]byte, chunkSize) + for { + n, err := sourceFile.Read(buf) + if err != nil { + if err == io.EOF { + break + } + return err + } + if n > 0 { + if _, err := encoder.Write(buf[:n]); err != nil { + return err + } + } + } + return nil +} + +func DecompressAndReadBSONLZ4(sourceFilePath string) error { + + compressedFile, err := os.Open(sourceFilePath) + if err != nil { + return fmt.Errorf("failed to open compressed file: %v", err) + } + defer compressedFile.Close() + + decoder := lz4.NewReader(compressedFile) + + bsonBuffer := make([]byte, 4) + // msgCount := 0 + + for { + if _, err := io.ReadFull(decoder, bsonBuffer); err != nil { + if err == io.EOF { + fmt.Println("Reached end of file") + break + } + return fmt.Errorf("failed to read BSON length: %v", err) + } + + length := int(binary.BigEndian.Uint32(bsonBuffer)) + if length <= 0 { + return fmt.Errorf("invalid BSON document length: %d", length) + } + + bsonData := make([]byte, length) + if _, err := io.ReadFull(decoder, bsonData); err != nil { + return fmt.Errorf("failed to read BSON data: %v", err) + } + + var msg map[string]interface{} + if err := bson.Unmarshal(bsonData, &msg); err != nil { + return fmt.Errorf("failed to unmarshal BSON: %v", err) + } + + // msgCount++ + // fmt.Println("Lz4 Decompressed BSON document count:", msg, msgCount) + + } + + return nil +} + +func DecompressAndReadBSONZst(destFilePath string) error { + + compressedFile, err := os.Open(destFilePath) + if err != nil { + return fmt.Errorf("failed to open compressed file: %v", err) + } + defer compressedFile.Close() + + decoder, err := zstd.NewReader(compressedFile) + if err != nil { + return fmt.Errorf("failed to create zstd reader: %v", err) + } + defer decoder.Close() + + // length prefix buffer, to read only length and skip to the next doc + bsonBuffer := make([]byte, 4) + // msgCount := 0 + + for { + // Reads length of the next BSON document + if _, err := io.ReadFull(decoder, bsonBuffer); err != nil { + if err == io.EOF { + fmt.Println("Reached end of file") + break + } + return fmt.Errorf("failed to read BSON length: %v", err) + } + + length := int(binary.BigEndian.Uint32(bsonBuffer)) + if length <= 0 { + return fmt.Errorf("invalid BSON document length: %d", length) + } + + bsonData := make([]byte, length) + // Read actual BSON data + if _, err := io.ReadFull(decoder, bsonData); err != nil { + return fmt.Errorf("failed to read BSON data: %v", err) + } + + var msg map[string]interface{} + + // Unmarshal BSON to generic interface + if err := bson.Unmarshal(bsonData, &msg); err != nil { + return fmt.Errorf("failed to unmarshal BSON: %v", err) + } + + _, err := jsoniterator.MarshalIndent(msg, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal JSON: %v", err) + } + + // msgCount++ + // fmt.Println("Decompressed JSON count :", msg, msgCount) + } + + return nil +} + +func JsonUnmarshaler(data []byte) (map[string]interface{}, error) { + obj := make(map[string]interface{}) + err := jsonparser.ObjectEach(data, func(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error { + var val interface{} + var parseErr error + switch dataType { + case jsonparser.String: + val, parseErr = jsonparser.ParseString(value) + case jsonparser.Number: + // [IMP !! ] May need to handle float values + val, parseErr = jsonparser.ParseInt(value) + case jsonparser.Boolean: + val, parseErr = jsonparser.ParseBoolean(value) + case jsonparser.Null: + val = nil + default: + val = value // For objects or arrays, this will just store the raw JSON. + } + if parseErr != nil { + return parseErr + } + obj[string(key)] = val + return nil + }) + + if err != nil { + return nil, err + } + return obj, nil +} + +func compressBSONFileNoChunk(sourceFilePath, destFilePath string) error { + + sourceFile, err := os.Open(sourceFilePath) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.OpenFile(destFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + log.Fatalf("Failed to open file: %v", err) + panic(err) + } + defer destFile.Close() + + // default compression level + encoder, err := zstd.NewWriter(destFile) + if err != nil { + return err + } + defer encoder.Close() + + _, err = io.Copy(encoder, sourceFile) + return err +} + +func compressBSONFileLZ4NoChunk(sourceFilePath, destFilePath string) error { + + sourceFile, err := os.Open(sourceFilePath) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.OpenFile(destFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + log.Fatalf("Failed to open file: %v", err) + panic(err) + } + defer destFile.Close() + + encoder := lz4.NewWriter(destFile) + defer encoder.Close() + + _, err = io.Copy(encoder, sourceFile) + return err +} diff --git a/collector/collector.go b/collector/collector.go index eb09d38..407a4d6 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -2,6 +2,10 @@ package collector import ( "context" + "fmt" + "strings" + "sync" + "time" "github.com/ipfs/go-cid" "github.com/multiformats/go-multicodec" @@ -241,6 +245,112 @@ func (cw *CollectorWorker) run(ctx context.Context, buffer []byte) { } } +type DataCheckResult struct { + SourceName string + Topic string + Messages int + DataSize int64 + Valid bool +} + +func (ci *CollectorInstance) CheckSourcesSanity() error { + var wg sync.WaitGroup + results := make(chan DataCheckResult, 100) + errors := make(chan error, 100) + + checkTopic := func(source Source, topic string) { + defer wg.Done() + + if !strings.Contains(source.Name, "rpc") { + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + msgChannel, errChannel, err := source.JoinFunc(ctx, source, topic) + if err != nil { + errors <- fmt.Errorf(" con failed %s with topic %s: %v", source.Name, topic, err) + return + } + + var count int + var dataSize int64 + var validDataCount int + + // data collection ttime + timer := time.NewTimer(20 * time.Second) + + for { + select { + case msg := <-msgChannel: + dataSize += int64(len(msg)) + count++ + + var data interface{} + if jsoniterator.Unmarshal(msg, &data) == nil { + validDataCount++ + } + case err := <-errChannel: + errors <- fmt.Errorf("error from source %s with topic %s: %v", source.Name, topic, err) + return + case <-timer.C: + results <- DataCheckResult{ + SourceName: source.Name, + Topic: topic, + Messages: count, + DataSize: dataSize, + Valid: validDataCount > 0, + } + return + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + errors <- fmt.Errorf("timeout occurred for source %s with topic %s", source.Name, topic) + } + return + } + } + } + } + + for _, source := range Sources { + for _, topic := range source.Topics { + wg.Add(1) + go checkTopic(source, topic) + } + } + + go func() { + wg.Wait() + close(results) + close(errors) + }() + + for { + select { + case result, ok := <-results: + if !ok { + results = nil + } else { + log.Infof("Checked %s:%s - Messages: %d, DataSize: %d, Valid: %t\n", result.SourceName, result.Topic, result.Messages, result.DataSize, result.Valid) + } + case err, ok := <-errors: + if !ok { + errors = nil + } else { + log.Infoln("Error:", err) + // return err + } + } + + if results == nil && errors == nil { + fmt.Println("Empty results") + break + } + } + + log.Infoln("All sources have been successfully checked.") + return nil +} + func (ci *CollectorInstance) Start(ctx context.Context) { log.Infof("Started collector instance.") ci.ctx = ctx diff --git a/collector/sources.go b/collector/sources.go index abfbd2c..ac17edb 100644 --- a/collector/sources.go +++ b/collector/sources.go @@ -305,3 +305,56 @@ func getVarFromEnv(envKey string) string { fmt.Println("Found OpenSea API Key in environment") return envVar } + +/* +// Will need to migrate to this approach for better managability +// For now during development its okay to hardcode in the code. + +type Source struct { + Name string `yaml:"Name"` + JoinFunc string `yaml:"JoinFunc"` // We will store the function name as a string --> convert later + ApiURL string `yaml:"ApiURL"` + Topics []string `yaml:"Topics"` + Request string `yaml:"Request"` + Marshaler string `yaml:"Marshaler"` // Store the marshaler type as a string --> covert later +} + +type Config struct { + Sources []Source `yaml:"Sources"` +} + +func LoadConfig(filename string) (*Config, error) { + buf, err := ioutil.ReadFile(filename) + if err != nil { + return nil, err + } + c := &Config{} + err = yaml.Unmarshal(buf, c) + if err != nil { + return nil, err + } + return c, nil +} + +var joinFuncs = map[string]func(ctx context.Context, source Source, topic string) (chan []byte, <-chan error, error){ + "defaultJoinCEX": defaultJoinCEX, + "ankrJoinRPC": ankrJoinRPC, +} + +var marshalers = map[string]MarshalFunc{ + "JsonToBsonMarshaler": &JsonToBsonMarshaler{}, + "JsonToLz4Marshaler": &JsonToLz4Marshaler{}, +} + +func setupSources(config *Config) { + for i, src := range config.Sources { + if join, ok := joinFuncs[src.JoinFunc]; ok { + config.Sources[i].JoinFunc = join + } + if marshal, ok := marshalers[src.Marshaler]; ok { + config.Sources[i].Marshaler = marshal + } + } +} + +*/ diff --git a/collector/sources.yml b/collector/sources.yml new file mode 100644 index 0000000..dc475e3 --- /dev/null +++ b/collector/sources.yml @@ -0,0 +1,63 @@ +Sources: + - Name: "binance" + JoinFunc: "defaultJoinCEX" + ApiURL: "wss://stream.binance.us:9443/ws" + Topics: + - "ethusdt" + - "solusdt" + - "btcusdt" + Request: "{ \"method\": \"SUBSCRIBE\", \"params\": [ \"{{topic}}@aggTrade\" ], \"id\": 1 }" + Marshaler: "JsonToBsonMarshaler" + + - Name: "coinbase" + JoinFunc: "defaultJoinCEX" + ApiURL: "wss://ws-feed.pro.coinbase.com" + Topics: + - "ETH-USD" + Request: "{\"type\": \"subscribe\", \"product_ids\": [ \"{{topic}}\" ], \"channels\": [ \"ticker\" ]}" + Marshaler: "JsonToBsonMarshaler" + + - Name: "dydx" + JoinFunc: "defaultJoinCEX" + ApiURL: "wss://api.dydx.exchange/v3/ws" + Topics: + - "MATIC-USD" + Request: "{\"type\": \"subscribe\", \"id\": \"{{topic}}\", \"channel\": \"v3_trades\"}" + Marshaler: "JsonToBsonMarshaler" + + - Name: "bybit" + JoinFunc: "defaultJoinCEX" + ApiURL: "wss://stream.bybit.com/v5/public/spot" + Topics: + - "orderbook.50.BTCUSDT" + - "publicTrade.BTCUSDT" + - "tickers.BTCUSDT" + - "kline.M.BTCUSDT" + Request: "{\"op\": \"subscribe\",\"args\": [\"{{topic}}\"]}" + Marshaler: "JsonToBsonMarshaler" + + - Name: "okx" + JoinFunc: "defaultJoinCEX" + ApiURL: "wss://ws.okx.com:8443/ws/v5/business" + Topics: + - "sprd-bbo-tbt" + - "sprd-books5" + - "sprd-public-trades" + - "sprd-tickers" + Request: "{\"op\": \"subscribe\",\"args\": [{\"channel\": \"{{topic}}\",\"sprdId\": \"BTC-USDT_BTC-USDT-SWAP\"}]}" + Marshaler: "JsonToBsonMarshaler" + + - Name: "ethereum-ankr-rpc" + JoinFunc: "ankrJoinRPC" + ApiURL: "https://rpc.ankr.com/eth" + Topics: [""] + Request: "" + Marshaler: "JsonToLz4Marshaler" + + - Name: "polygon-ankr-rpc" + JoinFunc: "ankrJoinRPC" + ApiURL: "https://rpc.ankr.com/polygon" + Topics: [""] + Request: "" + Marshaler: "JsonToLz4Marshaler" + diff --git a/collector/sources_metric_analysis.go b/collector/sources_metric_analysis.go new file mode 100644 index 0000000..b739981 --- /dev/null +++ b/collector/sources_metric_analysis.go @@ -0,0 +1,485 @@ +package collector + +import ( + "context" + "encoding/binary" + "encoding/csv" + + jsoniter "github.com/json-iterator/go" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4" + + "fmt" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/openmesh-network/core/internal/config" + log "github.com/openmesh-network/core/internal/logger" + "go.mongodb.org/mongo-driver/bson" +) + +// using this for now since we are unmarshaling every value +var jsoniterator = jsoniter.ConfigCompatibleWithStandardLibrary + +// All sources compress test +var filePathAll string = "compressed_files/test_bson_AllSources.bson" +var fileAll, _ = os.OpenFile(filePathAll, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + +type DataWindow struct { + SourceName string + Symbol string + StartTime string + DataSize int64 + MessageCount int + Throughput float64 +} + +type DataCollector struct { + Source Source + TimeWindow time.Duration + DataWindows map[string]*DataWindow + BusiestWindow *DataWindow +} + +type SourceMetric struct { + sourceName string + sourceTotalThroughputPerWindow map[string]float64 + busiestTimeWindowStartTime string + busiestThroughput float64 + mu sync.Mutex +} + +func NewSourceMetric(sourceName string) *SourceMetric { + return &SourceMetric{ + sourceName: sourceName, + sourceTotalThroughputPerWindow: make(map[string]float64), + busiestTimeWindowStartTime: "", + busiestThroughput: -1, + } +} + +func NewDataCollector(source Source, windowTimeSize time.Duration) *DataCollector { + return &DataCollector{ + Source: source, + TimeWindow: windowTimeSize, + DataWindows: make(map[string]*DataWindow), + BusiestWindow: &DataWindow{Throughput: -1}, + } +} + +type DataWriter struct { + file *os.File + writer *csv.Writer + mu sync.Mutex +} + +func NewDataWriter(filename string) (*DataWriter, error) { + file, err := os.Create(filename) + if err != nil { + return nil, err + } + + writer := csv.NewWriter(file) + return &DataWriter{ + file: file, + writer: writer, + }, nil +} + +func (dw *DataWriter) Write(data []string) error { + dw.mu.Lock() + defer dw.mu.Unlock() + if err := dw.writer.Write(data); err != nil { + return err + } + return nil +} + +func (dw *DataWriter) Close() error { + defer dw.file.Close() + if err := dw.writer.Error(); err != nil { + return err + } + dw.writer.Flush() + return nil +} + +// Handler to subscribe to each source and symbol & measure data size over a set period from current time. +func CalculateDataSize(t *testing.T, ctx context.Context, dataWriter *DataWriter, timeToCollect int, timeFrameWindowSize int) { + + config.Path = "../../core" + config.Name = "config" + config.ParseConfig(config.Path, true) + log.InitLogger() + + // Busiest times of a given src for a given symbol --> To detect high trading volume for a given symbol. + // Fetch data in parts of a whole. 10 mins --> 1 min windows which contain size of + // msgs received during that period and also number of msgs. Can calculate the load w this. --> Throughput = size / window time frame. + // for that time frame. + startTime := time.Now() + endTime := startTime.Add(time.Duration(timeToCollect) * time.Second) + timeFrame := time.Duration(timeFrameWindowSize) * time.Second + timeFormat := "15:04:05, 02 Jan 2006" + busyHeader := fmt.Sprintf("Busiest Time (Data collected for %d seconds Btw %s till %s)", timeToCollect, startTime.Format(timeFormat), endTime.Format(timeFormat)) + + //headerCSv + if err := dataWriter.writer.Write([]string{"Source", "Symbol", "Data Size (bytes)", "Throughput", busyHeader, "Throughput in the interval", "messages count received during interval"}); err != nil { + t.Fatalf("Failed to write header to CSV: %s", err) + } + + var wg sync.WaitGroup + sourceMetricsList := make([]*SourceMetric, 0) + + for _, source := range Sources { + sourceMetric := NewSourceMetric(source.Name) + sourceMetricsList = append(sourceMetricsList, sourceMetric) + for _, topic := range source.Topics { + dc := NewDataCollector(source, timeFrame) + wg.Add(1) + go func(src Source, tpc string, srcMetric *SourceMetric) { + defer wg.Done() + + // subscribe and wait till the time period for each src/symbol. + size, busiestWindow, _ := subscribeAndMeasure(ctx, src, tpc, time.NewTimer(time.Duration(timeToCollect)*time.Second), dc, timeFrameWindowSize, srcMetric) + throughput := size / int64(timeToCollect) + // log.Infof("Data size for %s - %s: %d bytes, with TP : %d \n", src.Name, tpc, size, throughput) + throughputStr := fmt.Sprintf("%.2f", busiestWindow.Throughput) + + // fmt.Println("Compressing source : ", src.Name) + + // CompressBSONFileZst(filePathSource+".bson", filePathSource+".zst", 4094) + + // CompressBSONFileLZ4(filePathSource+".bson", filePathSource+".lz4", 4094) + + // err1 := DecompressAndReadBSONLZ4(filePathSource + ".lz4") + // fmt.Println("decomp lz4 err ", err1) + + record := []string{src.Name, tpc, fmt.Sprintf("%d", size), fmt.Sprintf("%d", throughput), busiestWindow.StartTime, throughputStr, fmt.Sprintf("%d", busiestWindow.MessageCount)} + + dataWriter.mu.Lock() + defer dataWriter.mu.Unlock() + if err := dataWriter.writer.Write(record); err != nil { + t.Errorf("Failed to write data to CSV for %s - %s: %s", src.Name, tpc, err) + } + }(source, topic, sourceMetric) + } + } + wg.Wait() + + // Get the busiest source at a given time period + for _, sourceMetric := range sourceMetricsList { + log.Infof("Source: %s, Max Throughput: %.2f during %s \n", + sourceMetric.sourceName, + sourceMetric.busiestThroughput, + sourceMetric.busiestTimeWindowStartTime) + } + + fileAll.Close() + + fmt.Println("Compressing All sources: ") + CompressBSONFileZst("compressed_files/"+"test_bson_AllSources.bson", "compressed_files/"+"test_compressed_bson_AllSources.zst", 4094) + + CompressBSONFileLZ4("compressed_files/"+"test_bson_AllSources.bson", "compressed_files/"+"test_compressed_bson_AllSources.lz4", 4094) + + fmt.Println("Decompressing All sources: ") + err := DecompressAndReadBSONZst("compressed_files/test_compressed_bson_AllSources.zst") + fmt.Println("decomp err ", err) + + err1 := DecompressAndReadBSONLZ4("compressed_files/test_compressed_bson_AllSources.lz4") + fmt.Println("decomp lz4 err ", err1) + + // Flushing and close writer "after all go routines are done " (imp) + if err := dataWriter.Close(); err != nil { + t.Fatal("Failed to flush and close CSV writer:", err) + } +} + +// handles the subscription and updates size the data as its received. +func subscribeAndMeasure(ctx context.Context, source Source, symbol string, timer *time.Timer, dc *DataCollector, timeFrameWindowSize int, sourceMetrics *SourceMetric) (int64, *DataWindow, string) { + msgChan, err := Subscribe(ctx, source, symbol) + if err != nil { + log.Infof("Error subscribing to source %s with symbol %s: %v", source.Name, symbol, err) + return 0, &DataWindow{}, " " + } + + var dataSize int64 + var messageCount int64 + timerLocal := timer + + // InitialThroughput := float64(len(msgChan)) / float64(1) + globalWindow := &DataWindow{DataSize: int64(len(msgChan)), MessageCount: 1, Throughput: -1} + windowChange := make(chan string, 1) + oldWindowKey := "" + + dirPath := fmt.Sprintf("compressed_files/%s", sourceMetrics.sourceName) + + if err := os.MkdirAll(dirPath, 0755); err != nil { + log.Fatalf("Failed to create directory: %v", err) + panic(err) + } + + filePathSource := fmt.Sprintf("%s/test_%s_%s", dirPath, sourceMetrics.sourceName, symbol) + + // Individual source level compression. + bsonFile, err := os.OpenFile(filePathSource+".bson", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + log.Fatalf("Failed to open file: %v", err) + panic(err) + } + + zstFile, zstEncoder, err := SetupZstCompressionFile(filePathSource + ".zst") + if err != nil { + fmt.Print("error setup compres") + } + + lz4File, lz4Encoder, err := SetupLz4CompressionFile(filePathSource + ".lz4") + if err != nil { + fmt.Print("error setup compres") + } + + currentBufferSize := 0 + + var bufferSize = 8192 + + if source.Name == "dydx" { + bufferSize = 16384 + } + + buffer := make([]byte, bufferSize) + + // Manage window changes + go func() { + for { + select { + case windowKey := <-windowChange: + window := dc.DataWindows[windowKey] + if window != nil { + throughput := float64(window.DataSize) / float64(timeFrameWindowSize) + window.Throughput = throughput + + if throughput > globalWindow.Throughput { + globalWindow = window + } + // log.Infoln("Window changed for ", window.SourceName, window.Symbol, " . Startime : ", window.StartTime, " Window Throughput :", throughput, " Current max throughput : ", globalWindow.Throughput, " Message count in window : ", window.MessageCount) + } + case <-ctx.Done(): + close(windowChange) + return + } + } + }() + + // msgCount := 0 + + for { + select { + case msg := <-msgChan: + + if int64(len(msg)) == 0 || msg == nil { + continue + } + + var bsonData []byte + + // TODO : store Rpc messages + if !strings.Contains(source.Name, "rpc") { + + var jsonData interface{} + + // fmt.Println("msgcounnt origin , ", msgCount) + // msgCount++ + + // using jsoniter as of now since we are unmarshaling every key value. + if err := jsoniterator.Unmarshal(msg, &jsonData); err != nil { + log.Fatalf("Failed to unmarshal JSON: %v", err) + continue + } + + // fmt.Println("Msg", string(msg)) + // fmt.Println("Json", jsonData) + + // jsonparser library + // jsonData, err := JsonUnmarshaler(msg) + // if err != nil { + // panic(err) + // } + + bsonData, err = bson.Marshal(jsonData) + if err != nil { + log.Fatalf("Failed to marshal to BSON: %v", err) + continue + } + + bsonLength := len(bsonData) + totalLength := 4 + len(bsonData) + + if currentBufferSize+totalLength > bufferSize { + + // Buffer needs to be written and cleared because it won't fit + if currentBufferSize > 0 { + // pad, Compress and write the current buffer content to the file + fmt.Println("buf size old :", len(buffer[:currentBufferSize])) + paddingLength := bufferSize - currentBufferSize + + if paddingLength > 0 { + + for i := 0; i < paddingLength; i++ { + buffer[currentBufferSize+i] = 0 + } + } + + fmt.Println("total size exceeded ],", currentBufferSize, currentBufferSize+totalLength, bufferSize, paddingLength) + + _, err = bsonFile.Write(buffer[:currentBufferSize]) + if err != nil { + log.Fatalf(source.Name, " Failed to write to bson file: %v", err) + } + + _, err = fileAll.Write(buffer[:currentBufferSize]) + if err != nil { + log.Fatalf(source.Name, " Failed to write to All src file: %v", err) + } + + _, err := zstEncoder.Write(buffer[:currentBufferSize]) + if err != nil { + fmt.Println("Error writing") + } + fmt.Println("buf size :", len(buffer)) + + _, err1 := lz4Encoder.Write(buffer[:currentBufferSize]) + if err1 != nil { + fmt.Println("Error writing") + } + + if err := zstEncoder.Flush(); err != nil { + fmt.Println("Error flushing") + } + + if err := lz4Encoder.Flush(); err != nil { + fmt.Println("Error flushing") + } + fmt.Println("buf size new :", len(buffer)) + currentBufferSize = 0 + } + + } + + // fmt.Println("BSON data : ", source.Name, bsonData, " length : ", bsonLength) + + // Write length of the BSON data to buffer (big-endian format) + // for larger data --> use : + // binary.BigEndian.PutUint64(buffer[0:4], uint64(bsonLength)) + binary.BigEndian.PutUint32(buffer[currentBufferSize:], uint32(bsonLength)) + currentBufferSize += 4 + + // Copies BSON data to the buffer right after the length + copy(buffer[currentBufferSize:], bsonData) + currentBufferSize += bsonLength + + } + + currentTime := time.Now() + + // this rounds down the current time to the timeWindow + // if say window is 10 min, 21:16:57 is rounded to 21:10:00 + // such that we can have a map of all elements between 21:10:00 till 21:20:00 + // Format preserves quality by conv to string. + windowKey := currentTime.Truncate(dc.TimeWindow).Format(time.RFC3339) + + window, exists := dc.DataWindows[windowKey] + if !exists { + if oldWindowKey == "" { + windowChange <- windowKey + } else { + windowChange <- oldWindowKey + } + window = &DataWindow{ + SourceName: source.Name, + Symbol: symbol, + StartTime: windowKey, + DataSize: 0, + MessageCount: 0, + } + dc.DataWindows[windowKey] = window + } + oldWindowKey = windowKey + dc.DataWindows[windowKey].DataSize += int64(len(msg)) + dc.DataWindows[windowKey].MessageCount += 1 + + dataSize += int64(len(msg)) + messageCount += 1 + + // Aggregate source throughput per window + // --> Every roiutine (source + symbol) has global structure sourceMetrics + // they lock and change the throughputs as and when windows change + // --> This was, cross routine access is localized to that source's topics + // --> Easier to extract data as well later on. + sourceMetrics.mu.Lock() + currentThroughput := float64(window.DataSize) / float64(timeFrameWindowSize) + totalThroughput := sourceMetrics.sourceTotalThroughputPerWindow[windowKey] + sourceMetrics.sourceTotalThroughputPerWindow[windowKey] = totalThroughput + currentThroughput + if totalThroughput+currentThroughput > sourceMetrics.busiestThroughput { + sourceMetrics.busiestThroughput = totalThroughput + currentThroughput + sourceMetrics.busiestTimeWindowStartTime = windowKey + } + sourceMetrics.mu.Unlock() + + // Debug + // fmt.Println("Message received from ", source, symbol, " : ", string(msg)) + + case <-timerLocal.C: + // log.Infoln("Received entire data for : ", source.Name, symbol, ". Now stopping metric collection, message ct : ", messageCount) + + closeCompression(zstEncoder, zstFile) + closeCompressionLz4(lz4Encoder, lz4File) + fmt.Println("Closed file & encoder ", filePathSource) + time.Sleep(3 * time.Second) + + // fmt.Println("Zst Decompressing source: ", sourceMetrics.sourceName) + err := DecompressAndReadBSONZst(filePathSource + ".zst") + fmt.Println("Zst decomp err ", sourceMetrics.sourceName, err) + + fmt.Println("Lz4 Decompressing source: ", sourceMetrics.sourceName) + err1 := DecompressAndReadBSONLZ4(filePathSource + ".lz4") + fmt.Println("lz4 decomp err ", sourceMetrics.sourceName, err1) + + return dataSize, globalWindow, filePathSource + case <-ctx.Done(): + return dataSize, globalWindow, filePathSource + } + } +} + +// Function to cleanly close the compression and file +func closeCompression(encoder *zstd.Encoder, file *os.File) error { + if encoder != nil { + if err := encoder.Close(); err != nil { + file.Close() + return err + } + } + + if file != nil { + return file.Close() + } + + return nil +} + +func closeCompressionLz4(encoder *lz4.Writer, file *os.File) error { + if encoder != nil { + if err := encoder.Close(); err != nil { + file.Close() + return err + } + } + + if file != nil { + return file.Close() + } + + return nil +} diff --git a/collector/sources_test.go b/collector/sources_test.go index 1423f4d..a8b5244 100644 --- a/collector/sources_test.go +++ b/collector/sources_test.go @@ -209,3 +209,17 @@ func TestAnkrPolygonJoin(t *testing.T) { t.Log("This ran") } } + +// use timeout flag : go test -timeout 600s -run TestSourcesDataSize +func TestSourcesDataSize(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dataWriter, err := NewDataWriter("test_30min_benchmark_test.csv") + if err != nil { + t.Fatal("Error creating writer.") + } + // Time period to collect datd : Seconds (integer values only. !) + var timeToCollect int = 60 + var windowTimeFrame int = 10 + CalculateDataSize(t, ctx, dataWriter, timeToCollect, windowTimeFrame) +} diff --git a/main.go b/main.go index 5648cfd..a11ae71 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,14 @@ package main import ( "context" _ "embed" + "fmt" "os" "os/signal" "syscall" + "time" + + "net/http" + _ "net/http/pprof" "github.com/openmesh-network/core/collector" "github.com/openmesh-network/core/internal/bft" @@ -15,8 +20,6 @@ import ( "github.com/openmesh-network/core/internal/logger" "github.com/openmesh-network/core/networking/p2p" "github.com/openmesh-network/core/updater" - "net/http" - _ "net/http/pprof" ) const ( @@ -67,6 +70,18 @@ func main() { // Need collector before bft. collectorInstance := collector.New() + + // checking Sanity here + logger.Infoln("Checking Sanity... Please wait ~ 20 seconds") + errSanity := collectorInstance.CheckSourcesSanity() + if errSanity != nil { + fmt.Println("ABorting collection and stopping program. Sanity check failed") + panic(errSanity) + } + + // debug + time.Sleep(5 * time.Second) + collectorInstance.Start(cancelCtx) // Initialise CometBFT instance