55require 'time'
66require 'timecop'
77require 'zlib'
8+ require 'zstd-ruby'
89require 'fluent/file_wrapper'
910
1011class FileOutputTest < Test ::Unit ::TestCase
@@ -397,20 +398,32 @@ def create_driver(conf = CONFIG, opts = {})
397398 end
398399 end
399400
400- def check_gzipped_result ( path , expect )
401+ def check_zipped_result ( path , expect , type : :gzip )
401402 # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790
402403 # Following code from https://www.ruby-forum.com/topic/971591#979520
403404 result = ''
404- File . open ( path , "rb" ) { |io |
405- loop do
406- gzr = Zlib ::GzipReader . new ( StringIO . new ( io . read ) )
407- result << gzr . read
408- unused = gzr . unused
409- gzr . finish
410- break if unused . nil?
411- io . pos -= unused . length
412- end
413- }
405+ if type == :gzip || type == :gz
406+ File . open ( path , "rb" ) { |io |
407+ loop do
408+ gzr = Zlib ::GzipReader . new ( StringIO . new ( io . read ) )
409+ result << gzr . read
410+ unused = gzr . unused
411+ gzr . finish
412+ break if unused . nil?
413+ io . pos -= unused . length
414+ end
415+ }
416+ elsif type == :zstd
417+ File . open ( path , "rb" ) { |io |
418+ loop do
419+ reader = Zstd ::StreamReader . new ( StringIO . new ( io . read ) )
420+ result << reader . read ( 1024 )
421+ break if io . eof?
422+ end
423+ }
424+ else
425+ raise "Invalid compression type to check"
426+ end
414427
415428 assert_equal expect , result
416429 end
@@ -421,7 +434,7 @@ def check_result(path, expect)
421434 end
422435
423436 sub_test_case 'write' do
424- test 'basic case' do
437+ test 'basic case with gz ' do
425438 d = create_driver
426439
427440 assert_false File . exist? ( "#{ TMP_DIR } /out_file_test.20110102_0.log.gz" )
@@ -433,7 +446,29 @@ def check_result(path, expect)
433446 end
434447
435448 assert File . exist? ( "#{ TMP_DIR } /out_file_test.20110102_0.log.gz" )
436- check_gzipped_result ( "#{ TMP_DIR } /out_file_test.20110102_0.log.gz" , %[2011-01-02T13:14:15Z\t test\t {"a":1}#{ @default_newline } ] + %[2011-01-02T13:14:15Z\t test\t {"a":2}#{ @default_newline } ] )
449+ check_zipped_result ( "#{ TMP_DIR } /out_file_test.20110102_0.log.gz" , %[2011-01-02T13:14:15Z\t test\t {"a":1}#{ @default_newline } ] + %[2011-01-02T13:14:15Z\t test\t {"a":2}#{ @default_newline } ] )
450+ end
451+
452+ test 'write with zstd compression' do
453+ d = create_driver %[
454+ path #{ TMP_DIR } /out_file_test
455+ compress zstd
456+ utc
457+ <buffer>
458+ timekey_use_utc true
459+ </buffer>
460+ ]
461+
462+ assert_false File . exist? ( "#{ TMP_DIR } /out_file_test.20110102_0.log.zstd" )
463+
464+ time = event_time ( "2011-01-02 13:14:15 UTC" )
465+ d . run ( default_tag : 'test' ) do
466+ d . feed ( time , { "a" => 1 } )
467+ d . feed ( time , { "a" => 2 } )
468+ end
469+
470+ assert File . exist? ( "#{ TMP_DIR } /out_file_test.20110102_0.log.zstd" )
471+ check_zipped_result ( "#{ TMP_DIR } /out_file_test.20110102_0.log.zstd" , %[2011-01-02T13:14:15Z\t test\t {"a":1}#{ @default_newline } ] + %[2011-01-02T13:14:15Z\t test\t {"a":2}#{ @default_newline } ] , type : :zstd )
437472 end
438473 end
439474
@@ -481,7 +516,7 @@ def parse_system(text)
481516
482517 assert File . exist? ( "#{ TMP_DIR_WITH_SYSTEM } /out_file_test.20110102_0.log.gz" )
483518
484- check_gzipped_result ( "#{ TMP_DIR_WITH_SYSTEM } /out_file_test.20110102_0.log.gz" , %[2011-01-02T13:14:15Z\t test\t {"a":1}\n ] + %[2011-01-02T13:14:15Z\t test\t {"a":2}\n ] )
519+ check_zipped_result ( "#{ TMP_DIR_WITH_SYSTEM } /out_file_test.20110102_0.log.gz" , %[2011-01-02T13:14:15Z\t test\t {"a":1}\n ] + %[2011-01-02T13:14:15Z\t test\t {"a":2}\n ] )
485520 dir_mode = "%o" % File ::stat ( TMP_DIR_WITH_SYSTEM ) . mode
486521 assert_equal ( OVERRIDE_DIR_PERMISSION , dir_mode [ -3 , 3 ] . to_i )
487522 file_mode = "%o" % File ::stat ( "#{ TMP_DIR_WITH_SYSTEM } /out_file_test.20110102_0.log.gz" ) . mode
@@ -500,7 +535,7 @@ def parse_system(text)
500535 end
501536
502537 path = d . instance . last_written_path
503- check_gzipped_result ( path , %[#{ Yajl . dump ( { "a" => 1 , 'time' => time . to_i } ) } #{ @default_newline } ] + %[#{ Yajl . dump ( { "a" => 2 , 'time' => time . to_i } ) } #{ @default_newline } ] )
538+ check_zipped_result ( path , %[#{ Yajl . dump ( { "a" => 1 , 'time' => time . to_i } ) } #{ @default_newline } ] + %[#{ Yajl . dump ( { "a" => 2 , 'time' => time . to_i } ) } #{ @default_newline } ] )
504539 end
505540
506541 test 'ltsv' do
@@ -513,7 +548,7 @@ def parse_system(text)
513548 end
514549
515550 path = d . instance . last_written_path
516- check_gzipped_result ( path , %[a:1\t time:2011-01-02T13:14:15Z#{ @default_newline } ] + %[a:2\t time:2011-01-02T13:14:15Z#{ @default_newline } ] )
551+ check_zipped_result ( path , %[a:1\t time:2011-01-02T13:14:15Z#{ @default_newline } ] + %[a:2\t time:2011-01-02T13:14:15Z#{ @default_newline } ] )
517552 end
518553
519554 test 'single_value' do
@@ -526,7 +561,7 @@ def parse_system(text)
526561 end
527562
528563 path = d . instance . last_written_path
529- check_gzipped_result ( path , %[1#{ @default_newline } ] + %[2#{ @default_newline } ] )
564+ check_zipped_result ( path , %[1#{ @default_newline } ] + %[2#{ @default_newline } ] )
530565 end
531566 end
532567
@@ -547,23 +582,24 @@ def parse_system(text)
547582
548583 path = write_once . call
549584 assert_equal "#{ TMP_DIR } /out_file_test.20110102_0.log.gz" , path
550- check_gzipped_result ( path , formatted_lines )
585+ check_zipped_result ( path , formatted_lines )
551586 assert_equal 1 , Dir . glob ( "#{ TMP_DIR } /out_file_test.*" ) . size
552587
553588 path = write_once . call
554589 assert_equal "#{ TMP_DIR } /out_file_test.20110102_1.log.gz" , path
555- check_gzipped_result ( path , formatted_lines )
590+ check_zipped_result ( path , formatted_lines )
556591 assert_equal 2 , Dir . glob ( "#{ TMP_DIR } /out_file_test.*" ) . size
557592
558593 path = write_once . call
559594 assert_equal "#{ TMP_DIR } /out_file_test.20110102_2.log.gz" , path
560- check_gzipped_result ( path , formatted_lines )
595+ check_zipped_result ( path , formatted_lines )
561596 assert_equal 3 , Dir . glob ( "#{ TMP_DIR } /out_file_test.*" ) . size
562597 end
563598
564599 data (
565- "with compression" => true ,
566- "without compression" => false ,
600+ "without compression" => "text" ,
601+ "with gzip compression" => "gz" ,
602+ "with zstd compression" => "zstd"
567603 )
568604 test 'append' do |compression |
569605 time = event_time ( "2011-01-02 13:14:15 UTC" )
@@ -578,8 +614,8 @@ def parse_system(text)
578614 timekey_use_utc true
579615 </buffer>
580616 ]
581- if compression
582- config << " compress gz "
617+ if compression != :text
618+ config << " compress #{ compression } "
583619 end
584620 d = create_driver ( config )
585621 d . run ( default_tag : 'test' ) {
@@ -590,16 +626,16 @@ def parse_system(text)
590626 }
591627
592628 log_file_name = "out_file_test.20110102.log"
593- if compression
594- log_file_name << ".gz "
629+ if compression != "text"
630+ log_file_name << ".#{ compression } "
595631 end
596632
597633 1 . upto ( 3 ) do |i |
598634 path = write_once . call
599635 assert_equal "#{ TMP_DIR } /#{ log_file_name } " , path
600636 expect = formatted_lines * i
601- if compression
602- check_gzipped_result ( path , expect )
637+ if compression != "text"
638+ check_zipped_result ( path , expect , type : compression . to_sym )
603639 else
604640 check_result ( path , expect )
605641 end
@@ -630,15 +666,15 @@ def parse_system(text)
630666
631667 path = write_once . call
632668 assert_equal "#{ TMP_DIR } /out_file_test.20110102.log.gz" , path
633- check_gzipped_result ( path , formatted_lines )
669+ check_zipped_result ( path , formatted_lines )
634670
635671 path = write_once . call
636672 assert_equal "#{ TMP_DIR } /out_file_test.20110102.log.gz" , path
637- check_gzipped_result ( path , formatted_lines * 2 )
673+ check_zipped_result ( path , formatted_lines * 2 )
638674
639675 path = write_once . call
640676 assert_equal "#{ TMP_DIR } /out_file_test.20110102.log.gz" , path
641- check_gzipped_result ( path , formatted_lines * 3 )
677+ check_zipped_result ( path , formatted_lines * 3 )
642678 end
643679 end
644680
@@ -667,15 +703,15 @@ def parse_system(text)
667703 path = write_once . call
668704 # Rotated at 2011-01-02 17:00:00+02:00
669705 assert_equal "#{ TMP_DIR } /out_file_test.20110103.log.gz" , path
670- check_gzipped_result ( path , formatted_lines )
706+ check_zipped_result ( path , formatted_lines )
671707
672708 path = write_once . call
673709 assert_equal "#{ TMP_DIR } /out_file_test.20110103.log.gz" , path
674- check_gzipped_result ( path , formatted_lines * 2 )
710+ check_zipped_result ( path , formatted_lines * 2 )
675711
676712 path = write_once . call
677713 assert_equal "#{ TMP_DIR } /out_file_test.20110103.log.gz" , path
678- check_gzipped_result ( path , formatted_lines * 3 )
714+ check_zipped_result ( path , formatted_lines * 3 )
679715 end
680716 end
681717
@@ -871,6 +907,10 @@ def run_and_check(d, symlink_path)
871907 test 'returns .gz for gzip' do
872908 assert_equal '.gz' , @i . compression_suffix ( :gzip )
873909 end
910+
911+ test 'returns .zstd for zstd' do
912+ assert_equal '.zstd' , @i . compression_suffix ( :zstd )
913+ end
874914 end
875915
876916 sub_test_case '#generate_path_template' do
0 commit comments