Skip to content

Commit ccd2d49

Browse files
committed
CQLSSTableWriter with dict compression
1 parent c9e31f2 commit ccd2d49

File tree

5 files changed

+203
-6
lines changed

5 files changed

+203
-6
lines changed

src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.cassandra.db.DecoratedKey;
3535
import org.apache.cassandra.db.RegularAndStaticColumns;
3636
import org.apache.cassandra.db.SerializationHeader;
37+
import org.apache.cassandra.db.compression.CompressionDictionary;
3738
import org.apache.cassandra.db.partitions.PartitionUpdate;
3839
import org.apache.cassandra.db.rows.EncodingStats;
3940
import org.apache.cassandra.index.Index;
@@ -57,6 +58,8 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
5758
protected final Collection<Index.Group> indexGroups;
5859
protected Consumer<Collection<SSTableReader>> sstableProducedListener;
5960
protected boolean openSSTableOnProduced = false;
61+
protected CompressionDictionary compressionDictionary;
62+
protected SSTable.Owner owner;
6063

6164
protected AbstractSSTableSimpleWriter(File directory, TableMetadataRef metadata, RegularAndStaticColumns columns)
6265
{
@@ -81,6 +84,11 @@ protected void addIndexGroup(Index.Group indexGroup)
8184
this.indexGroups.add(indexGroup);
8285
}
8386

87+
public void setCompressionDictionary(CompressionDictionary compressionDictionary)
88+
{
89+
this.compressionDictionary = compressionDictionary;
90+
}
91+
8492
protected void setSSTableProducedListener(Consumer<Collection<SSTableReader>> listener)
8593
{
8694
this.sstableProducedListener = Objects.requireNonNull(listener, "sstableProducedListener cannot be null");
@@ -114,6 +122,20 @@ protected SSTableTxnWriter createWriter(SSTable.Owner owner) throws IOException
114122
if (makeRangeAware)
115123
return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, false, format, header);
116124

125+
126+
SSTable.Owner effectiveOwner;
127+
128+
if (this.owner != null && this.owner.compressionDictionaryManager() != null && compressionDictionary != null)
129+
{
130+
// already checks if it is cached or not
131+
this.owner.compressionDictionaryManager().add(compressionDictionary);
132+
effectiveOwner = this.owner;
133+
}
134+
else
135+
{
136+
effectiveOwner = owner;
137+
}
138+
117139
return SSTableTxnWriter.create(metadata,
118140
createDescriptor(directory, metadata.keyspace, metadata.name, format),
119141
0,
@@ -122,7 +144,7 @@ protected SSTableTxnWriter createWriter(SSTable.Owner owner) throws IOException
122144
false,
123145
header,
124146
indexGroups,
125-
owner);
147+
effectiveOwner);
126148
}
127149

128150
private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat<?, ?> fmt) throws IOException

src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,18 @@
5656
import org.apache.cassandra.db.Keyspace;
5757
import org.apache.cassandra.db.Slice;
5858
import org.apache.cassandra.db.Slices;
59+
import org.apache.cassandra.db.compression.CompressionDictionary;
5960
import org.apache.cassandra.db.marshal.AbstractType;
6061
import org.apache.cassandra.dht.IPartitioner;
6162
import org.apache.cassandra.dht.Murmur3Partitioner;
6263
import org.apache.cassandra.exceptions.InvalidRequestException;
6364
import org.apache.cassandra.exceptions.SyntaxException;
6465
import org.apache.cassandra.index.sai.StorageAttachedIndexGroup;
66+
import org.apache.cassandra.io.compress.IDictionaryCompressor;
6567
import org.apache.cassandra.io.sstable.format.SSTableFormat;
6668
import org.apache.cassandra.io.sstable.format.SSTableReader;
6769
import org.apache.cassandra.io.util.File;
70+
import org.apache.cassandra.schema.CompressionParams;
6871
import org.apache.cassandra.schema.KeyspaceMetadata;
6972
import org.apache.cassandra.schema.KeyspaceParams;
7073
import org.apache.cassandra.schema.Keyspaces;
@@ -413,6 +416,7 @@ public static class Builder
413416
private boolean buildIndexes = true;
414417
private Consumer<Collection<SSTableReader>> sstableProducedListener;
415418
private boolean openSSTableOnProduced = false;
419+
private CompressionDictionary compressionDictionary = null;
416420

417421
protected Builder()
418422
{
@@ -665,6 +669,18 @@ public Builder withFormat(SSTableFormat<?, ?> format)
665669
return this;
666670
}
667671

672+
/**
673+
* Use specific compression dictionary upon writing the data.
674+
*
675+
* @param compressionDictionary compression dictionary to use
676+
* @return this builder
677+
*/
678+
public Builder withCompressionDictionary(CompressionDictionary compressionDictionary)
679+
{
680+
this.compressionDictionary = compressionDictionary;
681+
return this;
682+
}
683+
668684
public CQLSSTableWriter build()
669685
{
670686
if (directory == null)
@@ -729,8 +745,26 @@ public CQLSSTableWriter build()
729745
Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true));
730746
}
731747

748+
if (compressionDictionary != null)
749+
{
750+
CompressionParams compressionParams = tableMetadata.params.compression;
751+
752+
if (!compressionParams.isDictionaryCompressionEnabled())
753+
{
754+
throw new IllegalStateException("Table's compressor can not accept any dictionary: " + compressionParams.asMap());
755+
}
756+
757+
IDictionaryCompressor compressor = (IDictionaryCompressor) compressionParams.getSstableCompressor();
758+
if (!compressor.canConsumeDictionary(compressionDictionary))
759+
{
760+
throw new IllegalStateException("Provided dictionary can not be consumed by table's compressor. " +
761+
"Provided dictionary type: " + compressionDictionary.kind() +
762+
"; expected dictionary type by the compressor: " + compressor.acceptableDictionaryKind());
763+
}
764+
}
765+
732766
ColumnFamilyStore cfs = null;
733-
if (buildIndexes && !indexStatements.isEmpty())
767+
if ((buildIndexes && !indexStatements.isEmpty()) || compressionDictionary != null)
734768
{
735769
KeyspaceMetadata keyspaceMetadata = ClusterMetadata.current().schema.getKeyspaceMetadata(keyspaceName);
736770
Keyspace keyspace = Keyspace.mockKS(keyspaceMetadata);
@@ -764,8 +798,8 @@ public CQLSSTableWriter build()
764798

765799
TableMetadataRef ref = tableMetadata.ref;
766800
AbstractSSTableSimpleWriter writer = sorted
767-
? new SSTableSimpleWriter(directory, ref, preparedModificationStatement.updatedColumns(), maxSSTableSizeInMiB)
768-
: new SSTableSimpleUnsortedWriter(directory, ref, preparedModificationStatement.updatedColumns(), maxSSTableSizeInMiB);
801+
? new SSTableSimpleWriter(cfs, directory, ref, preparedModificationStatement.updatedColumns(), maxSSTableSizeInMiB)
802+
: new SSTableSimpleUnsortedWriter(cfs, directory, ref, preparedModificationStatement.updatedColumns(), maxSSTableSizeInMiB);
769803

770804
if (format != null)
771805
writer.setSSTableFormatType(format);
@@ -777,6 +811,11 @@ public CQLSSTableWriter build()
777811
writer.addIndexGroup(saiGroup);
778812
}
779813

814+
if (compressionDictionary != null && cfs != null)
815+
{
816+
writer.setCompressionDictionary(compressionDictionary);
817+
}
818+
780819
if (sstableProducedListener != null)
781820
writer.setSSTableProducedListener(sstableProducedListener);
782821

src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,18 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
6969
private final DiskWriter diskWriter = new DiskWriter();
7070

7171
SSTableSimpleUnsortedWriter(File directory, TableMetadataRef metadata, RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
72+
{
73+
this(null, directory, metadata, columns, maxSSTableSizeInMiB);
74+
}
75+
76+
SSTableSimpleUnsortedWriter(SSTable.Owner owner, File directory, TableMetadataRef metadata, RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
7277
{
7378
super(directory, metadata, columns);
7479
this.maxSStableSizeInBytes = maxSSTableSizeInMiB * 1024L * 1024L;
7580
this.header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS);
7681
this.helper = new SerializationHelper(this.header);
7782
diskWriter.start();
83+
this.owner = owner;
7884
}
7985

8086
@Override

src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,27 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
6565
* Any non-positive value indicates the sstable size is unlimited.
6666
*/
6767
protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
68+
{
69+
this(null, directory, metadata, columns, maxSSTableSizeInMiB);
70+
}
71+
72+
/**
73+
* Create a SSTable writer for sorted input data.
74+
* When a positive {@param maxSSTableSizeInMiB} is defined, the writer outputs a sequence of SSTables,
75+
* whose sizes do not exceed the specified value.
76+
*
77+
* @param owner owner of sstables this writer creates
78+
* @param directory directory to store the sstable files
79+
* @param metadata table metadata
80+
* @param columns columns to update
81+
* @param maxSSTableSizeInMiB defines the max SSTable size if the value is positive.
82+
* Any non-positive value indicates the sstable size is unlimited.
83+
*/
84+
protected SSTableSimpleWriter(SSTable.Owner owner, File directory, TableMetadataRef metadata, RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
6885
{
6986
super(directory, metadata, columns);
7087
this.maxSSTableSizeInBytes = maxSSTableSizeInMiB * 1024L * 1024L;
88+
this.owner = owner;
7189
}
7290

7391
@Override

test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.LinkedHashSet;
3131
import java.util.List;
3232
import java.util.Map;
33+
import java.util.Random;
3334
import java.util.Set;
3435
import java.util.UUID;
3536
import java.util.concurrent.ExecutionException;
@@ -58,6 +59,11 @@
5859
import org.apache.cassandra.cql3.functions.types.TypeCodec;
5960
import org.apache.cassandra.cql3.functions.types.UDTValue;
6061
import org.apache.cassandra.cql3.functions.types.UserType;
62+
import org.apache.cassandra.db.compression.CompressionDictionary;
63+
import org.apache.cassandra.db.compression.CompressionDictionary.DictId;
64+
import org.apache.cassandra.db.compression.CompressionDictionaryTrainingConfig;
65+
import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
66+
import org.apache.cassandra.db.compression.ZstdDictionaryTrainer;
6167
import org.apache.cassandra.db.marshal.FloatType;
6268
import org.apache.cassandra.db.marshal.UTF8Type;
6369
import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -84,10 +90,11 @@
8490
import org.apache.cassandra.utils.FBUtilities;
8591
import org.apache.cassandra.utils.JavaDriverUtils;
8692
import org.apache.cassandra.utils.OutputHandler;
87-
import org.assertj.core.api.Assertions;
8893

94+
import static org.apache.cassandra.db.compression.CompressionDictionary.Kind.ZSTD;
8995
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
9096
import static org.assertj.core.api.Assertions.assertThat;
97+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
9198
import static org.junit.Assert.assertEquals;
9299
import static org.junit.Assert.assertFalse;
93100
import static org.junit.Assert.assertNotNull;
@@ -1671,7 +1678,7 @@ public void testConstraintViolation() throws Exception
16711678

16721679
writer.addRow(1, 4);
16731680

1674-
Assertions.assertThatThrownBy(() -> writer.addRow(2, 11))
1681+
assertThatThrownBy(() -> writer.addRow(2, 11))
16751682
.describedAs("Should throw when adding a row that violates constraints")
16761683
.isInstanceOf(ConstraintViolationException.class)
16771684
.hasMessageContaining("Column value does not satisfy value constraint for column 'v1'. It should be v1 < 5");
@@ -1690,6 +1697,111 @@ public void testConstraintViolation() throws Exception
16901697
}
16911698
}
16921699

1700+
@Test
1701+
public void testWritingWithZstdDictionaryWhenUsingInvalidCompressor()
1702+
{
1703+
// the compressor is not dictionary-aware so we will fail
1704+
final String schema = "CREATE TABLE " + qualifiedTable + " ("
1705+
+ " k int,"
1706+
+ " v1 text,"
1707+
+ " PRIMARY KEY (k)"
1708+
+ ") WITH compression = {'class': 'ZstdCompressor'}";
1709+
1710+
assertThatThrownBy(() -> CQLSSTableWriter.builder()
1711+
.inDirectory(dataDir)
1712+
.forTable(schema)
1713+
.using("INSERT INTO " + keyspace + '.' + table + " (k, v1) VALUES (?, ?)")
1714+
// does not matter, we will fail anyway
1715+
.withCompressionDictionary(new ZstdCompressionDictionary(new DictId(ZSTD, 1), new byte[0]))
1716+
.build())
1717+
.hasMessage("Table's compressor can not accept any dictionary: {chunk_length_in_kb=16, class=org.apache.cassandra.io.compress.ZstdCompressor}")
1718+
.isInstanceOf(IllegalStateException.class);
1719+
}
1720+
1721+
@Test
1722+
public void testWritingWithZstdDictionary() throws Exception
1723+
{
1724+
final String schema = "CREATE TABLE " + qualifiedTable + " ("
1725+
+ " k int,"
1726+
+ " v1 text,"
1727+
+ " PRIMARY KEY (k)"
1728+
+ ") WITH compression = {'class': 'ZstdDictionaryCompressor'}";
1729+
1730+
CompressionDictionary dictionary = DictionaryHelper.trainDictionary(keyspace, table);
1731+
1732+
CQLSSTableWriter writer = CQLSSTableWriter.builder()
1733+
.inDirectory(dataDir)
1734+
.forTable(schema)
1735+
.using("INSERT INTO " + keyspace + '.' + table + " (k, v1) VALUES (?, ?)")
1736+
.withCompressionDictionary(dictionary)
1737+
.build();
1738+
1739+
for (int i = 0; i < 500; i++)
1740+
{
1741+
writer.addRow(i, DictionaryHelper.INSTANCE.getRandomSample());
1742+
}
1743+
1744+
writer.close();
1745+
1746+
loadSSTables(dataDir, keyspace, table);
1747+
1748+
if (verifyDataAfterLoading)
1749+
{
1750+
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
1751+
assertNotNull(resultSet);
1752+
Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
1753+
for (int i = 0; i < 500; i++)
1754+
{
1755+
UntypedResultSet.Row row = iter.next();
1756+
assertEquals(i, row.getInt("k"));
1757+
assertNotNull(row.getString("v1"));
1758+
}
1759+
}
1760+
}
1761+
1762+
/**
1763+
* Simple generator of random data for Zstd compression dictionary and dictionary trainer.
1764+
*/
1765+
private static class DictionaryHelper
1766+
{
1767+
public static final DictionaryHelper INSTANCE = new DictionaryHelper();
1768+
private static final Random random = new Random();
1769+
1770+
private static final String[] dates = new String[] {"2025-10-20","2025-10-19","2025-10-18","2025-10-17","2025-10-16"};
1771+
private static final String[] times = new String[] {"11:00:01","11:00:02","11:00:03","11:00:04","11:00:05"};
1772+
private static final String[] levels = new String[] {"TRACE", "DEBUG", "INFO", "WARN", "ERROR"};
1773+
private static final String[] services = new String[] {"com.example.UserService", "com.example.DatabasePool", "com.example.PaymentService", "com.example.OrderService"};
1774+
1775+
private String getRandomSample()
1776+
{
1777+
return dates[random.nextInt(dates.length)] + ' ' +
1778+
times[random.nextInt(times.length)] + ' ' +
1779+
levels[random.nextInt(levels.length)] + ' ' +
1780+
services[random.nextInt(services.length)] + ' ' +
1781+
UUID.randomUUID(); // message
1782+
}
1783+
1784+
private static CompressionDictionary trainDictionary(String keyspace, String table)
1785+
{
1786+
CompressionDictionaryTrainingConfig config = CompressionDictionaryTrainingConfig
1787+
.builder()
1788+
.maxDictionarySize(65536)
1789+
.maxTotalSampleSize(1024 * 1024) // 1MB total
1790+
.build();
1791+
1792+
try (ZstdDictionaryTrainer trainer = new ZstdDictionaryTrainer(keyspace, table, config, 3))
1793+
{
1794+
trainer.start(true);
1795+
for (int i = 0; i < 25000; i++)
1796+
{
1797+
trainer.addSample(UTF8Type.instance.fromString(DictionaryHelper.INSTANCE.getRandomSample()));
1798+
}
1799+
1800+
return trainer.trainDictionary(false);
1801+
}
1802+
}
1803+
}
1804+
16931805
protected static void loadSSTables(File dataDir, final String ks, final String tb) throws ExecutionException, InterruptedException
16941806
{
16951807
SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()

0 commit comments

Comments
 (0)