@@ -259,6 +259,10 @@ public void setup() throws IOException {
259259 TEST_UTIL = HBaseTestingUtility .createLocalHTU ();
260260 FILESYSTEM = TEST_UTIL .getTestFileSystem ();
261261 CONF = TEST_UTIL .getConfiguration ();
262+ // Disable directory sharing to prevent race conditions when tests run in parallel.
263+ // Each test instance gets its own isolated directories to avoid one test's tearDown()
264+ // deleting directories another parallel test is still using.
265+ CONF .setBoolean ("hbase.test.disable-directory-sharing" , true );
262266 NettyAsyncFSWALConfigHelper .setEventLoopConfig (CONF , GROUP , NioSocketChannel .class );
263267 dir = TEST_UTIL .getDataTestDir ("TestHRegion" ).toString ();
264268 method = name .getMethodName ();
@@ -1352,18 +1356,23 @@ public void testGetWhileRegionClose() throws IOException {
13521356 threads [i ].start ();
13531357 }
13541358 } finally {
1359+ done .set (true );
1360+ for (GetTillDoneOrException t : threads ) {
1361+ if (t != null ) {
1362+ try {
1363+ t .join (5000 );
1364+ } catch (InterruptedException e ) {
1365+ e .printStackTrace ();
1366+ }
1367+ }
1368+ }
13551369 if (this .region != null ) {
13561370 HBaseTestingUtility .closeRegionAndWAL (this .region );
13571371 this .region = null ;
13581372 }
13591373 }
1360- done . set ( true );
1374+ // Check for errors after threads have been stopped
13611375 for (GetTillDoneOrException t : threads ) {
1362- try {
1363- t .join ();
1364- } catch (InterruptedException e ) {
1365- e .printStackTrace ();
1366- }
13671376 if (t .e != null ) {
13681377 LOG .info ("Exception=" + t .e );
13691378 assertFalse ("Found a NPE in " + t .getName (), t .e instanceof NullPointerException );
@@ -1391,7 +1400,7 @@ class GetTillDoneOrException extends Thread {
13911400
13921401 @ Override
13931402 public void run () {
1394- while (!this .done .get ()) {
1403+ while (!this .done .get () && ! Thread . currentThread (). isInterrupted () ) {
13951404 try {
13961405 assertTrue (region .get (g ).size () > 0 );
13971406 this .count .incrementAndGet ();
@@ -4517,7 +4526,7 @@ public void checkNoError() {
45174526 @ Override
45184527 public void run () {
45194528 done = false ;
4520- while (!done ) {
4529+ while (!done && ! Thread . currentThread (). isInterrupted () ) {
45214530 synchronized (this ) {
45224531 try {
45234532 wait ();
@@ -4730,7 +4739,7 @@ public void checkNoError() {
47304739 @ Override
47314740 public void run () {
47324741 done = false ;
4733- while (!done ) {
4742+ while (!done && ! Thread . currentThread (). isInterrupted () ) {
47344743 try {
47354744 for (int r = 0 ; r < numRows ; r ++) {
47364745 byte [] row = Bytes .toBytes ("row" + r );
@@ -5264,7 +5273,7 @@ public void testParallelIncrementWithMemStoreFlush() throws Exception {
52645273 Runnable flusher = new Runnable () {
52655274 @ Override
52665275 public void run () {
5267- while (!incrementDone .get ()) {
5276+ while (!incrementDone .get () && ! Thread . currentThread (). isInterrupted () ) {
52685277 try {
52695278 region .flush (true );
52705279 } catch (Exception e ) {
@@ -5280,13 +5289,38 @@ public void run() {
52805289 long expected = (long ) threadNum * incCounter ;
52815290 Thread [] incrementers = new Thread [threadNum ];
52825291 Thread flushThread = new Thread (flusher );
5292+ flushThread .setName ("FlushThread-" + method );
52835293 for (int i = 0 ; i < threadNum ; i ++) {
52845294 incrementers [i ] = new Thread (new Incrementer (this .region , incCounter ));
52855295 incrementers [i ].start ();
52865296 }
52875297 flushThread .start ();
5288- for (int i = 0 ; i < threadNum ; i ++) {
5289- incrementers [i ].join ();
5298+ try {
5299+ for (int i = 0 ; i < threadNum ; i ++) {
5300+ incrementers [i ].join ();
5301+ }
5302+
5303+ incrementDone .set (true );
5304+ flushThread .join ();
5305+
5306+ Get get = new Get (Incrementer .incRow );
5307+ get .addColumn (Incrementer .family , Incrementer .qualifier );
5308+ get .readVersions (1 );
5309+ Result res = this .region .get (get );
5310+ List <Cell > kvs = res .getColumnCells (Incrementer .family , Incrementer .qualifier );
5311+
5312+ // we just got the latest version
5313+ assertEquals (1 , kvs .size ());
5314+ Cell kv = kvs .get (0 );
5315+ assertEquals (expected , Bytes .toLong (kv .getValueArray (), kv .getValueOffset ()));
5316+ } finally {
5317+ // Ensure flush thread is stopped even if test fails or times out
5318+ incrementDone .set (true );
5319+ flushThread .interrupt ();
5320+ flushThread .join (5000 ); // Wait up to 5 seconds for thread to stop
5321+ if (flushThread .isAlive ()) {
5322+ LOG .warn ("Flush thread did not stop within timeout for test " + method );
5323+ }
52905324 }
52915325
52925326 incrementDone .set (true );
@@ -5349,7 +5383,7 @@ public void testParallelAppendWithMemStoreFlush() throws Exception {
53495383 Runnable flusher = new Runnable () {
53505384 @ Override
53515385 public void run () {
5352- while (!appendDone .get ()) {
5386+ while (!appendDone .get () && ! Thread . currentThread (). isInterrupted () ) {
53535387 try {
53545388 region .flush (true );
53555389 } catch (Exception e ) {
@@ -5369,13 +5403,41 @@ public void run() {
53695403 }
53705404 Thread [] appenders = new Thread [threadNum ];
53715405 Thread flushThread = new Thread (flusher );
5406+ flushThread .setName ("FlushThread-" + method );
53725407 for (int i = 0 ; i < threadNum ; i ++) {
53735408 appenders [i ] = new Thread (new Appender (this .region , appendCounter ));
53745409 appenders [i ].start ();
53755410 }
53765411 flushThread .start ();
5377- for (int i = 0 ; i < threadNum ; i ++) {
5378- appenders [i ].join ();
5412+ try {
5413+ for (int i = 0 ; i < threadNum ; i ++) {
5414+ appenders [i ].join ();
5415+ }
5416+
5417+ appendDone .set (true );
5418+ flushThread .join ();
5419+
5420+ Get get = new Get (Appender .appendRow );
5421+ get .addColumn (Appender .family , Appender .qualifier );
5422+ get .readVersions (1 );
5423+ Result res = this .region .get (get );
5424+ List <Cell > kvs = res .getColumnCells (Appender .family , Appender .qualifier );
5425+
5426+ // we just got the latest version
5427+ assertEquals (1 , kvs .size ());
5428+ Cell kv = kvs .get (0 );
5429+ byte [] appendResult = new byte [kv .getValueLength ()];
5430+ System .arraycopy (kv .getValueArray (), kv .getValueOffset (), appendResult , 0 ,
5431+ kv .getValueLength ());
5432+ assertArrayEquals (expected , appendResult );
5433+ } finally {
5434+ // Ensure flush thread is stopped even if test fails or times out
5435+ appendDone .set (true );
5436+ flushThread .interrupt ();
5437+ flushThread .join (5000 ); // Wait up to 5 seconds for thread to stop
5438+ if (flushThread .isAlive ()) {
5439+ LOG .warn ("Flush thread did not stop within timeout for test " + method );
5440+ }
53795441 }
53805442
53815443 appendDone .set (true );
@@ -7337,7 +7399,7 @@ public void testMutateRowInParallel() throws Exception {
73377399 // Writer thread
73387400 Thread writerThread = new Thread (() -> {
73397401 try {
7340- while (true ) {
7402+ while (! Thread . currentThread (). isInterrupted () ) {
73417403 // If all the reader threads finish, then stop the writer thread
73427404 if (latch .await (0 , TimeUnit .MILLISECONDS )) {
73437405 return ;
@@ -7362,15 +7424,19 @@ public void testMutateRowInParallel() throws Exception {
73627424 .addColumn (fam1 , q3 , tsIncrement + 1 , Bytes .toBytes (1L ))
73637425 .addColumn (fam1 , q4 , tsAppend + 1 , Bytes .toBytes ("a" )) });
73647426 }
7427+ } catch (InterruptedException e ) {
7428+ // Test interrupted, exit gracefully
7429+ Thread .currentThread ().interrupt ();
73657430 } catch (Exception e ) {
73667431 assertionError .set (new AssertionError (e ));
73677432 }
73687433 });
7434+ writerThread .setName ("WriterThread-" + method );
73697435 writerThread .start ();
73707436
73717437 // Reader threads
73727438 for (int i = 0 ; i < numReaderThreads ; i ++) {
7373- new Thread (() -> {
7439+ Thread readerThread = new Thread (() -> {
73747440 try {
73757441 for (int j = 0 ; j < 10000 ; j ++) {
73767442 // Verify the values
@@ -7399,13 +7465,24 @@ public void testMutateRowInParallel() throws Exception {
73997465 }
74007466
74017467 latch .countDown ();
7402- }).start ();
7468+ });
7469+ readerThread .setName ("ReaderThread-" + i + "-" + method );
7470+ readerThread .start ();
74037471 }
74047472
7405- writerThread .join ();
7473+ try {
7474+ writerThread .join ();
74067475
7407- if (assertionError .get () != null ) {
7408- throw assertionError .get ();
7476+ if (assertionError .get () != null ) {
7477+ throw assertionError .get ();
7478+ }
7479+ } finally {
7480+ // Ensure writer thread is stopped on test timeout
7481+ writerThread .interrupt ();
7482+ writerThread .join (5000 );
7483+ if (writerThread .isAlive ()) {
7484+ LOG .warn ("Writer thread did not stop within timeout for test " + method );
7485+ }
74097486 }
74107487 }
74117488
0 commit comments