Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .abi-check/7.1.0/postgres.symbols.ignore
Original file line number Diff line number Diff line change
@@ -1 +1,12 @@
pgarch_start
ConfigureNamesInt_gp
child_triggers
has_update_triggers
ConfigureNamesBool_gp
aocs_beginscan
AppendOnlyBlockDirectory_GetEntry
ConfigureNamesString_gp
gp_pause_on_restore_point_replay
ConfigureNamesReal_gp
TableAmRoutine
MainLWLockNames
5 changes: 5 additions & 0 deletions GNUmakefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ installcheck-gpcheckcat:
$(call recurse,installcheck-world,gpcontrib/gp_replica_check,installcheck)
$(call recurse,installcheck-world,src/bin/pg_upgrade,check)

.PHONY: installcheck-hot-standby
installcheck-hot-standby: submake-generated-headers
$(MAKE) -C src/test/regress installcheck-hot-standby
$(MAKE) -C src/test/isolation2 installcheck-hot-standby

# Run mock tests, that don't require a running server. Arguably these should
# be part of [install]check-world, but we treat them more like part of
# compilation than regression testing, in the CI. But they are too heavy-weight
Expand Down
15 changes: 12 additions & 3 deletions gpMgmt/bin/gpstart
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class GpStart:
skip_heap_checksum_validation=False,
fts_hosts=None,
etcd_hosts=None,
is_external_fts=False
is_external_fts=False,
segment_config_file=None
):
assert (specialMode in [None, 'maintenance'])
self.specialMode = specialMode
Expand All @@ -78,6 +79,7 @@ class GpStart:
self.etcd_hosts = etcd_hosts
self.is_external_fts = is_external_fts
self.singlenodemode = False
self.segment_config_file = segment_config_file

#
# Some variables that are set during execution
Expand Down Expand Up @@ -510,7 +512,11 @@ class GpStart:

logger.info("Obtaining Segment details from coordinator...")
self.dburl = dbconn.DbURL(port=self.port, dbname='template1')
self.gparray = GpArray.initFromCatalog(self.dburl, utility=True)
if self.segment_config_file:
self.gparray = GpArray.initFromFile(self.segment_config_file)
self.gparray.is_singlenode= False
else:
self.gparray = GpArray.initFromCatalog(self.dburl, utility=True)

logger.info("Setting new coordinator era")
e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose())
Expand Down Expand Up @@ -876,6 +882,8 @@ class GpStart:
addTo.add_option('-E', dest='etcd_hosts', type='string',default=None ,
help='specify the file that contains all etcd hosts.If this argument is set, `gpstart` will attempt'
'to start all etcd in the specified hosts')
addTo.add_option('-f', '--segment_config_file', dest='segment_config_file', type='string', default=None,
help='specify the gp_segment_configuration file to load for this cluster')

parser.set_defaults(verbose=False, filters=[], slice=(None, None))

Expand Down Expand Up @@ -922,7 +930,8 @@ class GpStart:
skip_heap_checksum_validation=options.skip_heap_checksum_validation,
fts_hosts=options.fts_hosts,
etcd_hosts=options.etcd_hosts,
is_external_fts=external_fts
is_external_fts=external_fts,
segment_config_file=options.segment_config_file
)


Expand Down
8 changes: 8 additions & 0 deletions src/backend/access/heap/heapam.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ heapgetpage(TableScanDesc sscan, BlockNumber page)

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

#ifdef FAULT_INJECTOR
FaultInjector_InjectFaultIfSet(
"heapgetpage_after_unlock_buffer",
DDLNotSpecified,
"", /* databaseName */
RelationGetRelationName(scan->rs_base.rs_rd)); /* tableName */
#endif

Assert(ntup <= MaxHeapTuplesPerPage);
scan->rs_ntuples = ntup;
}
Expand Down
11 changes: 11 additions & 0 deletions src/backend/access/rmgrdesc/standbydesc.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
xlrec->dbId, xlrec->tsId,
xlrec->relcacheInitFileInval);
}
else if (info == XLOG_LATESTCOMPLETED_GXID)
{
DistributedTransactionId gxid;

gxid = *((DistributedTransactionId *) rec);
appendStringInfo(buf, UINT64_FORMAT, gxid);
}

}

const char *
Expand All @@ -84,6 +92,9 @@ standby_identify(uint8 info)
case XLOG_INVALIDATIONS:
id = "INVALIDATIONS";
break;
case XLOG_LATESTCOMPLETED_GXID:
id = "XLOG_LATESTCOMPLETED_GXID";
break;
}

return id;
Expand Down
45 changes: 45 additions & 0 deletions src/backend/access/transam/README
Original file line number Diff line number Diff line change
Expand Up @@ -897,3 +897,48 @@ yet simplifies emulation of subtransactions considerably.

Further details on locking mechanics in recovery are given in comments
with the Lock rmgr code.

Distributed Transaction Emulation during Recovery
-------------------------------------

In GPDB, the MVCC snapshot also includes distributed transactions (aka dtx).
Accordingly, on a hot standby we also emulate running dtx. The way to do that
is to re-use the shmCommittedGxidArray which has been used on a primary for dtx
recovery: it tracks all the 2PC dtx that have their PREPARE phase done,
but for which the COMMIT phase hasn't finished (i.e. window between the
XLOG_XACT_DISTRIBUTED_COMMIT record being written and the
XLOG_XACT_DISTRIBUTED_FORGET record being written on the QD). On a hot standby,
any dtx shown in that array are regarded as in-progress. The MVCC snapshot does
not really need to account for dtx not in that array: for a dtx that hasn't
done PREPARE, we know no segment has committed any data yet; for a dtx that
hasn't done COMMIT, we know all segments have committed their data.

Note: dtxes that are preparing will not be tracked in this array, and thus will
not be included in this snapshot. This is slightly different from a primary QD,
where such transactions would have been included in the distributed snapshot's
inProgressXidArray (as we construct the inProgressXidArray from the PGXACTs that
would contain the dummy entries for prepared transactions). However, as
mentioned in CreateDistributedSnapshot, including these is not a requirement for
correctness.

Note: aborted/aborting dtxes are not accounted for by the standby either. Those
are the dtxes that encountered error during preparing. Same as the previous
point, the standby does not need to be aware of them for correctness. Worth also
noting that if a dtx encountered error after being prepared, it cannot be
aborted anymore and must be committed by the dtx recovery process. Until
committed, such a dtx will be seen as in-progress to the standby.

For 1PC dtx, however, there is a known limitation where the hot standby won't
see the last 1PC (or the last few 1PCs if they are all 1PC). This is because
since 1PC does not have any WAL on QD, the standby QD won't advance its
latestCompletedGxid, so its distributed snapshot horizon does not include the
last 1PC - it would view the last 1PC not yet started or at best still in
progress. Only if another 2PC comes, the standby would advance its
latestCompletedGxid and its distributed snapshot will include the previous 1PC.

We don't emulate the full architecture of "running transaction" for dtx because
that is unnecessary, at least ATM. For example, we don't create a dtx-version
of XLOG_RUNNING_XACTS, because we already have that information as part of the
extended checkpoint (see TMGXACT_CHECKPOINT). We also don't need to emulate
other members in RunningTransactionsData, like subxid or xid-pruning related
variables because those do not apply to dtx.
67 changes: 36 additions & 31 deletions src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -2475,11 +2475,10 @@ StartTransaction(void)

/*
* Transactions may be started while recovery is in progress, if
* hot standby is enabled. This mode is not supported in
* Cloudberry yet.
* hot standby is enabled.
*/
AssertImply(DistributedTransactionContext != DTX_CONTEXT_LOCAL_ONLY,
!s->startedInRecovery);
EnableHotStandby || !s->startedInRecovery);
/*
* MPP Modification
*
Expand Down Expand Up @@ -2526,20 +2525,39 @@ StartTransaction(void)

case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
/*
* Sanity check for the global xid.
*
* Note for hot standby dispatch: the standby QEs are still
* writers, just like primary QEs for SELECT queries. But
* hot standby dispatch never has a valid gxid, so we skip
* the gxid checks for the standby QEs.
*/
if (!IS_HOT_STANDBY_QE())
{
if (QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId)
elog(ERROR,
"distributed transaction id is invalid in context %s",
DtxContextToString(DistributedTransactionContext));

/*
* Update distributed XID info, this is only used for
* debugging.
*/
LocalDistribXactData *ele = &MyProc->localDistribXactData;
ele->distribXid = QEDtxContextInfo.distributedXid;
ele->state = LOCALDISTRIBXACT_STATE_ACTIVE;
}
else
Assert(QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId);

/* fall through */
case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT:
{
/* If we're running in test-mode insert a delay in writer. */
if (gp_enable_slow_writer_testmode)
pg_usleep(500000);

if (DistributedTransactionContext != DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT &&
QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId)
{
elog(ERROR,
"distributed transaction id is invalid in context %s",
DtxContextToString(DistributedTransactionContext));
}

/*
* Snapshot must not be created before setting transaction
* isolation level.
Expand All @@ -2552,28 +2570,14 @@ StartTransaction(void)
XactReadOnly = isMppTxOptions_ReadOnly(
QEDtxContextInfo.distributedTxnOptions);

/* a hot standby transaction must be read-only */
AssertImply(IS_HOT_STANDBY_QE(), XactReadOnly);

/*
* MPP: we're a QE Writer.
*/
MyTmGxact->gxid = QEDtxContextInfo.distributedXid;

if (DistributedTransactionContext ==
DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER ||
DistributedTransactionContext ==
DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER)
{
Assert(QEDtxContextInfo.distributedXid !=
InvalidDistributedTransactionId);

/*
* Update distributed XID info, this is only used for
* debugging.
*/
LocalDistribXactData *ele = &MyProc->localDistribXactData;
ele->distribXid = QEDtxContextInfo.distributedXid;
ele->state = LOCALDISTRIBXACT_STATE_ACTIVE;
}

if (SharedLocalSnapshotSlot != NULL)
{
LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);
Expand Down Expand Up @@ -6880,8 +6884,8 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_distrib xl_distrib;
xl_xact_deldbs xl_deldbs;
XLogRecPtr recptr;
bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit);
bool isDtxPrepared = isPreparedDtxTransaction();
DistributedTransactionId distrib_xid = getDistributedTransactionId();

uint8 info;

Expand Down Expand Up @@ -6971,10 +6975,11 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
}

if (isDtxPrepared || isOnePhaseQE)
/* include distributed xid if there's one */
if (distrib_xid != InvalidDistributedTransactionId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_DISTRIB;
xl_distrib.distrib_xid = getDistributedTransactionId();
xl_distrib.distrib_xid = distrib_xid;
}

#if 0
Expand Down
Loading