Skip to content

Commit fcfc674

Browse files
committed
HBASE-30086 Rewrite TestFromClientSide related tests
1 parent b0ea092 commit fcfc674

31 files changed

+5397
-5356
lines changed
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.hamcrest.CoreMatchers.instanceOf;
21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
import static org.junit.jupiter.api.Assertions.fail;
25+
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.NavigableSet;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.FileSystem;
34+
import org.apache.hadoop.fs.Path;
35+
import org.apache.hadoop.hbase.DoNotRetryIOException;
36+
import org.apache.hadoop.hbase.ExtendedCell;
37+
import org.apache.hadoop.hbase.HBaseTestingUtil;
38+
import org.apache.hadoop.hbase.HConstants;
39+
import org.apache.hadoop.hbase.TableName;
40+
import org.apache.hadoop.hbase.TableNameTestExtension;
41+
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
42+
import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
43+
import org.apache.hadoop.hbase.regionserver.HRegion;
44+
import org.apache.hadoop.hbase.regionserver.HStore;
45+
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
46+
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
47+
import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner;
48+
import org.apache.hadoop.hbase.regionserver.ScanInfo;
49+
import org.apache.hadoop.hbase.regionserver.StoreScanner;
50+
import org.apache.hadoop.hbase.util.Bytes;
51+
import org.apache.hadoop.hbase.wal.WAL;
52+
import org.junit.jupiter.api.AfterAll;
53+
import org.junit.jupiter.api.Test;
54+
import org.junit.jupiter.api.extension.RegisterExtension;
55+
56+
public class FromClientSideScanExcpetionTestBase {
57+
58+
protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
59+
60+
protected static byte[] FAMILY = Bytes.toBytes("testFamily");
61+
62+
protected static int SLAVES = 3;
63+
64+
@RegisterExtension
65+
private TableNameTestExtension name = new TableNameTestExtension();
66+
67+
@AfterAll
68+
public static void tearDownAfterClass() throws Exception {
69+
TEST_UTIL.shutdownMiniCluster();
70+
}
71+
72+
private static AtomicBoolean ON = new AtomicBoolean(false);
73+
private static AtomicLong REQ_COUNT = new AtomicLong(0);
74+
private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw
75+
// DNRIOE
76+
private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once
77+
78+
private static void reset() {
79+
ON.set(false);
80+
REQ_COUNT.set(0);
81+
IS_DO_NOT_RETRY.set(false);
82+
THROW_ONCE.set(true);
83+
}
84+
85+
private static void inject() {
86+
ON.set(true);
87+
}
88+
89+
protected static void startCluster() throws Exception {
90+
Configuration conf = TEST_UTIL.getConfiguration();
91+
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
92+
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
93+
conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class);
94+
conf.setBoolean("hbase.client.log.scanner.activity", true);
95+
// We need more than one region server in this test
96+
TEST_UTIL.startMiniCluster(SLAVES);
97+
}
98+
99+
public static final class MyHRegion extends HRegion {
100+
101+
@SuppressWarnings("deprecation")
102+
public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
103+
RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
104+
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
105+
}
106+
107+
@Override
108+
protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup)
109+
throws IOException {
110+
return new MyHStore(this, family, conf, warmup);
111+
}
112+
}
113+
114+
public static final class MyHStore extends HStore {
115+
116+
public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam,
117+
boolean warmup) throws IOException {
118+
super(region, family, confParam, warmup);
119+
}
120+
121+
@Override
122+
protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
123+
NavigableSet<byte[]> targetCols, long readPt) throws IOException {
124+
return scan.isReversed()
125+
? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
126+
: new MyStoreScanner(this, scanInfo, scan, targetCols, readPt);
127+
}
128+
}
129+
130+
public static final class MyStoreScanner extends StoreScanner {
131+
public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
132+
long readPt) throws IOException {
133+
super(store, scanInfo, scan, columns, readPt);
134+
}
135+
136+
@Override
137+
protected List<KeyValueScanner> selectScannersFrom(HStore store,
138+
List<? extends KeyValueScanner> allScanners) {
139+
List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
140+
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
141+
for (KeyValueScanner scanner : scanners) {
142+
newScanners.add(new DelegatingKeyValueScanner(scanner) {
143+
@Override
144+
public boolean reseek(ExtendedCell key) throws IOException {
145+
if (ON.get()) {
146+
REQ_COUNT.incrementAndGet();
147+
if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) {
148+
if (IS_DO_NOT_RETRY.get()) {
149+
throw new DoNotRetryIOException("Injected exception");
150+
} else {
151+
throw new IOException("Injected exception");
152+
}
153+
}
154+
}
155+
return super.reseek(key);
156+
}
157+
});
158+
}
159+
return newScanners;
160+
}
161+
}
162+
163+
/**
164+
* Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving
165+
* the server side RegionScanner to be in dirty state. The client has to ensure that the
166+
* ClientScanner does not get an exception and also sees all the data.
167+
*/
168+
@Test
169+
public void testClientScannerIsResetWhenScanThrowsIOException()
170+
throws IOException, InterruptedException {
171+
reset();
172+
THROW_ONCE.set(true); // throw exceptions only once
173+
TableName tableName = name.getTableName();
174+
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
175+
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
176+
TEST_UTIL.getAdmin().flush(tableName);
177+
inject();
178+
int actualRowCount = HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
179+
assertEquals(rowCount, actualRowCount);
180+
}
181+
assertTrue(REQ_COUNT.get() > 0);
182+
}
183+
184+
/**
185+
* Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
186+
* is that the exception will bubble up to the client scanner instead of being retried.
187+
*/
188+
@Test
189+
public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
190+
throws IOException, InterruptedException {
191+
reset();
192+
IS_DO_NOT_RETRY.set(true);
193+
TableName tableName = name.getTableName();
194+
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
195+
TEST_UTIL.loadTable(t, FAMILY, false);
196+
TEST_UTIL.getAdmin().flush(tableName);
197+
inject();
198+
HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
199+
fail("Should have thrown an exception");
200+
} catch (DoNotRetryIOException expected) {
201+
// expected
202+
}
203+
assertTrue(REQ_COUNT.get() > 0);
204+
}
205+
206+
/**
207+
* Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is
208+
* that the we will keep on retrying, but fail after the retries are exhausted instead of retrying
209+
* indefinitely.
210+
*/
211+
@Test
212+
public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
213+
throws IOException, InterruptedException {
214+
TableName tableName = name.getTableName();
215+
reset();
216+
THROW_ONCE.set(false); // throw exceptions in every retry
217+
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
218+
TEST_UTIL.loadTable(t, FAMILY, false);
219+
TEST_UTIL.getAdmin().flush(tableName);
220+
inject();
221+
HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
222+
fail("Should have thrown an exception");
223+
} catch (ScannerResetException expected) {
224+
// expected
225+
} catch (RetriesExhaustedException e) {
226+
// expected
227+
assertThat(e.getCause(), instanceOf(ScannerResetException.class));
228+
}
229+
assertTrue(REQ_COUNT.get() >= 3);
230+
}
231+
}

0 commit comments

Comments
 (0)