Skip to content

Commit ccf12ef

Browse files
committed
Add accord journal standalone dump tool
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20738
1 parent 674b8d5 commit ccf12ef

File tree

15 files changed

+500
-13
lines changed

15 files changed

+500
-13
lines changed

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "modules/accord"]
22
path = modules/accord
33
url = https://github.com/apache/cassandra-accord.git
4-
branch = trunk
4+
branch = trunk

src/java/org/apache/cassandra/config/AccordSpec.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,11 @@ public static class JournalSpec implements Params
196196
private volatile long flushCombinedBlockPeriod = Long.MIN_VALUE;
197197
public Version version = Version.DOWNGRADE_SAFE_VERSION;
198198

199-
public void setFlushPeriod(DurationSpec newFlushPeriod)
199+
public JournalSpec setFlushPeriod(DurationSpec newFlushPeriod)
200200
{
201201
flushPeriod = newFlushPeriod;
202202
flushCombinedBlockPeriod = Long.MIN_VALUE;
203+
return this;
203204
}
204205

205206
public void setPeriodicFlushLagBlock(DurationSpec newPeriodicFlushLagBlock)

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2234,7 +2234,7 @@ public static void createAllDirectories()
22342234
}
22352235
catch (FSWriteError e)
22362236
{
2237-
throw new IllegalStateException(e.getCause().getMessage() + "; unable to start server");
2237+
throw new IllegalStateException(e.getCause().getMessage() + "; unable to start server", e);
22382238
}
22392239
}
22402240

src/java/org/apache/cassandra/journal/Descriptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ static Descriptor fromName(File directory, String name)
109109
return new Descriptor(directory, timestamp, generation, journalVersion, userVersion);
110110
}
111111

112-
static Descriptor fromFile(File file)
112+
public static Descriptor fromFile(File file)
113113
{
114114
return fromName(file.parent(), file.name());
115115
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.journal;
20+
21+
import java.util.function.Consumer;
22+
23+
/**
24+
* Helper file to avoid exposing components outside their package-local visibility scope
25+
*/
26+
public class DumpUtil
27+
{
28+
public static void dumpMetadata(Descriptor descriptor, Consumer<String> out)
29+
{
30+
if (Component.METADATA.existsFor(descriptor))
31+
{
32+
out.accept(Metadata.load(descriptor).toString());
33+
}
34+
else
35+
out.accept("Metadata absent for " + descriptor);
36+
}
37+
38+
public static <K, V> StaticSegment<K, V> open(Descriptor descriptor, KeySupport<K> keySupport)
39+
{
40+
return StaticSegment.open(descriptor, keySupport);
41+
}
42+
}

src/java/org/apache/cassandra/journal/Metadata.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,13 @@ static <K> Metadata rebuildAndPersist(Descriptor descriptor, KeySupport<K> keySu
154154
metadata.persist(descriptor);
155155
return metadata;
156156
}
157+
158+
@Override
159+
public String toString()
160+
{
161+
return "Metadata{" +
162+
"fsyncLimit=" + fsyncLimit +
163+
", recordsCount=" + recordsCount +
164+
'}';
165+
}
157166
}

src/java/org/apache/cassandra/journal/StaticSegment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into)
289289
/**
290290
* Iterate over and invoke the supplied callback on every record.
291291
*/
292-
void forEachRecord(RecordConsumer<K> consumer)
292+
public void forEachRecord(RecordConsumer<K> consumer)
293293
{
294294
try (SequentialReader<K> reader = sequentialReader(descriptor, keySupport, fsyncLimit))
295295
{

src/java/org/apache/cassandra/net/InboundMessageHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.function.Consumer;
2525

26+
import org.apache.cassandra.utils.ByteBufferUtil;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829

@@ -167,7 +168,14 @@ private void processSmallMessage(ShareableBytes bytes, int size, Header header)
167168
{
168169
Message<?> m = serializer.deserialize(in, header, version);
169170
if (in.available() > 0) // bytes remaining after deser: deserializer is busted
170-
throw new InvalidSerializedSizeException(header.verb, size, size - in.available());
171+
{
172+
Throwable t = new InvalidSerializedSizeException(header.verb, size, size - in.available());
173+
ByteBuffer clone = bytes.get();
174+
clone.limit(clone.position() + size); // cap to expected message size
175+
logger.error("Could not deserialize the message: {}", ByteBufferUtil.bytesToHex(clone), t);
176+
throw t;
177+
178+
}
171179
message = m;
172180
}
173181
catch (IncompatibleSchemaException e)

src/java/org/apache/cassandra/schema/SchemaConstants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ public static boolean isValidName(String name)
9292
emptyVersion = UUID.nameUUIDFromBytes(Digest.forSchema().digest());
9393
}
9494

95+
/**
96+
* @return whether the table is an Accord Journal tabe
97+
*/
98+
public static boolean isAccordJournal(String keyspaceName, String tableName)
99+
{
100+
return keyspaceName.equals(SchemaConstants.ACCORD_KEYSPACE_NAME) && tableName.equals(AccordKeyspace.JOURNAL);
101+
}
102+
95103
/**
96104
* @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded)
97105
*/

src/java/org/apache/cassandra/service/accord/AccordJournal.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ public Builder load(int commandStoreId, TxnId txnId)
415415
return loadDiffs(commandStoreId, txnId, Load.ALL);
416416
}
417417

418-
private <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
418+
public <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
419419
{
420420
BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
421421
// TODO (expected): this can be further improved to avoid allocating lambdas
@@ -425,6 +425,11 @@ private <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
425425
return builder;
426426
}
427427

428+
public void forEachEntry(JournalKey key, AccordJournalTable.Reader reader)
429+
{
430+
journalTable.readAll(key, reader);
431+
}
432+
428433
private <T> RecordPointer appendInternal(JournalKey key, T write)
429434
{
430435
AccordJournalValueSerializers.FlyweightSerializer<T, ?> serializer = (AccordJournalValueSerializers.FlyweightSerializer<T, ?>) key.type.serializer;

0 commit comments

Comments
 (0)