Skip to content

Commit 1bf0397

Browse files
committed
Truncate mutation journal as logs get reconciled
patch by Aleksey Yeschenko; reviewed by Alex Petrov for CASSANDRA-20710
1 parent b67f4e2 commit 1bf0397

38 files changed

+1159
-248
lines changed

src/java/org/apache/cassandra/journal/ActiveSegment.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,19 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
7979
private final Ref<Segment<K, V>> selfRef;
8080

8181
final InMemoryIndex<K> index;
82+
final KeyStats.Active<K> keyStats;
8283

8384
private ActiveSegment(
84-
Descriptor descriptor, Params params, InMemoryIndex<K> index, Metadata metadata, KeySupport<K> keySupport)
85+
Descriptor descriptor,
86+
Params params,
87+
InMemoryIndex<K> index,
88+
Metadata metadata,
89+
KeyStats.Active<K> keyStats,
90+
KeySupport<K> keySupport)
8591
{
8692
super(descriptor, metadata, keySupport);
8793
this.index = index;
94+
this.keyStats = keyStats;
8895
try
8996
{
9097
channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
@@ -98,16 +105,12 @@ private ActiveSegment(
98105
}
99106
}
100107

101-
public CommitLogPosition currentPosition()
102-
{
103-
return new CommitLogPosition(id(), (int) allocateOffset);
104-
}
105-
106-
static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params params, KeySupport<K> keySupport)
108+
static <K, V> ActiveSegment<K, V> create(
109+
Descriptor descriptor, Params params, KeySupport<K> keySupport, KeyStats.Factory<K> keyStatsFactory)
107110
{
108111
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
109112
Metadata metadata = Metadata.empty();
110-
return new ActiveSegment<>(descriptor, params, index, metadata, keySupport);
113+
return new ActiveSegment<>(descriptor, params, index, metadata, keyStatsFactory.create(), keySupport);
111114
}
112115

113116
@Override
@@ -116,6 +119,16 @@ public InMemoryIndex<K> index()
116119
return index;
117120
}
118121

122+
public KeyStats.Active<K> keyStats()
123+
{
124+
return keyStats;
125+
}
126+
127+
public CommitLogPosition currentPosition()
128+
{
129+
return new CommitLogPosition(id(), (int) allocateOffset);
130+
}
131+
119132
boolean isEmpty()
120133
{
121134
return allocateOffset == 0;
@@ -225,6 +238,7 @@ void persistComponents()
225238
{
226239
index.persist(descriptor);
227240
metadata.persist(descriptor);
241+
keyStats.persist(descriptor);
228242
SyncUtil.trySyncDir(descriptor.directory);
229243
}
230244

@@ -236,6 +250,7 @@ private void discard()
236250
descriptor.fileFor(Component.DATA).deleteIfExists();
237251
descriptor.fileFor(Component.INDEX).deleteIfExists();
238252
descriptor.fileFor(Component.METADATA).deleteIfExists();
253+
descriptor.fileFor(Component.KEYSTATS).deleteIfExists();
239254
}
240255

241256
@Override
@@ -290,6 +305,7 @@ public String name()
290305
}
291306
}
292307

308+
@Override
293309
public boolean isFlushed(long position)
294310
{
295311
return writtenTo >= position;
@@ -465,18 +481,14 @@ final class Allocation extends RecordPointer
465481
this.buffer = buffer;
466482
}
467483

468-
Segment<K, V> segment()
469-
{
470-
return ActiveSegment.this;
471-
}
472-
473484
void write(K id, ByteBuffer record)
474485
{
475486
try
476487
{
477488
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
478-
metadata.update();
479489
index.update(id, position, length);
490+
keyStats.update(id);
491+
metadata.update();
480492
}
481493
catch (IOException e)
482494
{
@@ -508,6 +520,7 @@ void writeInternal(K id, ByteBuffer record)
508520
{
509521
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
510522
index.update(id, position, length);
523+
keyStats.update(id);
511524
metadata.update();
512525
}
513526
catch (IOException e)

src/java/org/apache/cassandra/journal/Component.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@
2424

2525
import static accord.utils.SortedArrays.SortedArrayList.ofSorted;
2626

27-
enum Component
27+
public enum Component
2828
{
29-
DATA ("data"),
30-
INDEX ("indx"),
31-
METADATA ("meta");
29+
DATA ("data"),
30+
INDEX ("indx"),
31+
METADATA ("meta"),
32+
KEYSTATS ("keys");
3233
//OFFSET_MAP (".offs"),
3334
//INVLALIDATIONS (".invl");
3435

3536
public static final List<Component> VALUES = ofSorted(values());
37+
3638
final String extension;
3739

3840
Component(String extension)

src/java/org/apache/cassandra/journal/Descriptor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.regex.Matcher;
2424
import java.util.regex.Pattern;
2525

26+
import com.google.common.annotations.VisibleForTesting;
27+
2628
import org.apache.cassandra.io.util.File;
2729

2830
import static java.lang.String.format;
@@ -90,7 +92,8 @@ public final class Descriptor implements Comparable<Descriptor>
9092
this.userVersion = userVersion;
9193
}
9294

93-
static Descriptor create(File directory, long timestamp, int userVersion)
95+
@VisibleForTesting
96+
public static Descriptor create(File directory, long timestamp, int userVersion)
9497
{
9598
return new Descriptor(directory, timestamp, 1, CURRENT_JOURNAL_VERSION, userVersion);
9699
}
@@ -114,12 +117,12 @@ public static Descriptor fromFile(File file)
114117
return fromName(file.parent(), file.name());
115118
}
116119

117-
File fileFor(Component component)
120+
public File fileFor(Component component)
118121
{
119122
return new File(directory, formatFileName(component));
120123
}
121124

122-
File tmpFileFor(Component component)
125+
public File tmpFileFor(Component component)
123126
{
124127
return new File(directory, formatFileName(component) + '.' + TMP_SUFFIX);
125128
}

src/java/org/apache/cassandra/journal/DumpUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ public static void dumpMetadata(Descriptor descriptor, Consumer<String> out)
3737

3838
public static <K, V> StaticSegment<K, V> open(Descriptor descriptor, KeySupport<K> keySupport)
3939
{
40-
return StaticSegment.open(descriptor, keySupport);
40+
return StaticSegment.open(descriptor, keySupport, KeyStats.Factory.noop());
4141
}
4242
}

src/java/org/apache/cassandra/journal/EntrySerializer.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,8 @@
2727
import java.nio.ByteBuffer;
2828
import java.util.zip.CRC32;
2929

30-
import static org.apache.cassandra.journal.Journal.validateCRC;
31-
3230
/**
3331
* Entry format:
34-
*
3532
* [Total Size (4 bytes)]
3633
* [Header (variable size)]
3734
* [Header CRC (4 bytes)]
@@ -95,10 +92,10 @@ static <K> void read(EntryHolder<K> into,
9592
CRC32 crc = Crc.crc32();
9693
int headerSize = EntrySerializer.headerSize(keySupport, userVersion);
9794
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
98-
validateCRC(crc, headerCrc);
95+
Crc.validate(crc, headerCrc);
9996

10097
int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize);
101-
validateCRC(crc, recordCrc);
98+
Crc.validate(crc, recordCrc);
10299
}
103100

104101
readValidated(into, from, start, keySupport, userVersion);
@@ -142,7 +139,7 @@ static <K> int tryRead(EntryHolder<K> into,
142139
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
143140
try
144141
{
145-
validateCRC(crc, headerCrc);
142+
Crc.validate(crc, headerCrc);
146143
}
147144
catch (IOException e)
148145
{
@@ -152,7 +149,7 @@ static <K> int tryRead(EntryHolder<K> into,
152149
int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize);
153150
try
154151
{
155-
validateCRC(crc, recordCrc);
152+
Crc.validate(crc, recordCrc);
156153
}
157154
catch (IOException e)
158155
{

0 commit comments

Comments
 (0)