1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .kafka .streams .state .internals ;
19+
20+ import org .apache .kafka .streams .errors .ProcessorStateException ;
21+ import org .apache .kafka .streams .state .HeadersBytesStore ;
22+ import org .apache .kafka .streams .state .internals .metrics .RocksDBMetricsRecorder ;
23+
24+ import org .rocksdb .ColumnFamilyDescriptor ;
25+ import org .rocksdb .ColumnFamilyHandle ;
26+ import org .rocksdb .ColumnFamilyOptions ;
27+ import org .rocksdb .DBOptions ;
28+ import org .rocksdb .Options ;
29+ import org .rocksdb .RocksDB ;
30+ import org .rocksdb .RocksDBException ;
31+ import org .rocksdb .RocksIterator ;
32+ import org .slf4j .Logger ;
33+ import org .slf4j .LoggerFactory ;
34+
35+ import java .nio .charset .StandardCharsets ;
36+ import java .util .Arrays ;
37+ import java .util .List ;
38+
39+ /**
40+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
41+ */
42+ public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore implements HeadersBytesStore {
43+
44+ private static final Logger log = LoggerFactory .getLogger (RocksDBTimestampedStoreWithHeaders .class );
45+
46+ /**
47+ * Legacy column family name - must match {@code RocksDBTimestampedStore#TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME}
48+ */
49+
50+ private static final byte [] LEGACY_TIMESTAMPED_CF_NAME =
51+ RocksDBTimestampedStore .TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME ;
52+
53+ private static final byte [] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
54+ "keyValueWithTimestampAndHeaders" .getBytes (StandardCharsets .UTF_8 );
55+
56+ public RocksDBTimestampedStoreWithHeaders (final String name ,
57+ final String metricsScope ) {
58+ super (name , metricsScope );
59+ }
60+
61+ RocksDBTimestampedStoreWithHeaders (final String name ,
62+ final String parentDir ,
63+ final RocksDBMetricsRecorder metricsRecorder ) {
64+ super (name , parentDir , metricsRecorder );
65+ }
66+
67+ @ Override
68+ void openRocksDB (final DBOptions dbOptions ,
69+ final ColumnFamilyOptions columnFamilyOptions ) {
70+ // Check if we're upgrading from RocksDBTimestampedStore (which uses keyValueWithTimestamp CF)
71+ final List <byte []> existingCFs ;
72+ try (final Options options = new Options (dbOptions , new ColumnFamilyOptions ())) {
73+ existingCFs = RocksDB .listColumnFamilies (options , dbDir .getAbsolutePath ());
74+ } catch (final RocksDBException e ) {
75+ throw new ProcessorStateException ("Error listing column families for store " + name , e );
76+ }
77+
78+
79+ final boolean upgradingFromTimestampedStore = existingCFs .stream ()
80+ .anyMatch (cf -> Arrays .equals (cf , LEGACY_TIMESTAMPED_CF_NAME ));
81+
82+ if (upgradingFromTimestampedStore ) {
83+ openInUpgradeMode (dbOptions , columnFamilyOptions );
84+ } else {
85+ openInRegularMode (dbOptions , columnFamilyOptions );
86+ }
87+ }
88+
89+ private void openInUpgradeMode (final DBOptions dbOptions ,
90+ final ColumnFamilyOptions columnFamilyOptions ) {
91+ final List <ColumnFamilyHandle > columnFamilies = openRocksDB (
92+ dbOptions ,
93+ // we have to open the default CF to be able to open the legacy CF, but we won't use it
94+ new ColumnFamilyDescriptor (RocksDB .DEFAULT_COLUMN_FAMILY , columnFamilyOptions ),
95+ new ColumnFamilyDescriptor (LEGACY_TIMESTAMPED_CF_NAME , columnFamilyOptions ),
96+ new ColumnFamilyDescriptor (TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME , columnFamilyOptions )
97+ );
98+
99+ verifyAndCloseEmptyDefaultColumnFamily (columnFamilies .get (0 ));
100+
101+ final ColumnFamilyHandle legacyCf = columnFamilies .get (1 );
102+ final ColumnFamilyHandle headersCf = columnFamilies .get (2 );
103+
104+ // Check if legacy CF has data
105+ try (final RocksIterator legacyIter = db .newIterator (legacyCf )) {
106+ legacyIter .seekToFirst ();
107+ if (legacyIter .isValid ()) {
108+ log .info ("Opening store {} in upgrade mode" , name );
109+ cfAccessor = new DualColumnFamilyAccessor (legacyCf , headersCf ,
110+ HeadersBytesStore ::convertToHeaderFormat , this );
111+ } else {
112+ log .info ("Opening store {} in regular headers-aware mode" , name );
113+ cfAccessor = new SingleColumnFamilyAccessor (headersCf );
114+ try {
115+ db .dropColumnFamily (legacyCf );
116+ } catch (final RocksDBException e ) {
117+ throw new RuntimeException (e );
118+ } finally {
119+ legacyCf .close ();
120+ }
121+ }
122+ }
123+ }
124+
125+ private void openInRegularMode (final DBOptions dbOptions ,
126+ final ColumnFamilyOptions columnFamilyOptions ) {
127+ final List <ColumnFamilyHandle > columnFamilies = openRocksDB (
128+ dbOptions ,
129+ // we have to open the default CF to be able to open the legacy CF, but we won't use it
130+ new ColumnFamilyDescriptor (RocksDB .DEFAULT_COLUMN_FAMILY , columnFamilyOptions ),
131+ new ColumnFamilyDescriptor (TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME , columnFamilyOptions )
132+ );
133+
134+ verifyAndCloseEmptyDefaultColumnFamily (columnFamilies .get (0 ));
135+
136+ final ColumnFamilyHandle headersCf = columnFamilies .get (1 );
137+ log .info ("Opening store {} in regular headers-aware mode" , name );
138+ cfAccessor = new SingleColumnFamilyAccessor (headersCf );
139+ }
140+
141+ private void verifyAndCloseEmptyDefaultColumnFamily (final ColumnFamilyHandle columnFamilyHandle ) {
142+ try (final RocksIterator defaultIter = db .newIterator (columnFamilyHandle )) {
143+ defaultIter .seekToFirst ();
144+ if (defaultIter .isValid ()) {
145+ throw new ProcessorStateException ("Cannot upgrade directly from key-value store to headers-aware store for " + name + ". " +
146+ "Please first upgrade to RocksDBTimestampedStore, then upgrade to RocksDBTimestampedStoreWithHeaders." );
147+ }
148+ }
149+ }
150+
151+ }
0 commit comments