1515package org .hyperledger .besu .ethereum .eth .transactions ;
1616
1717import static org .hyperledger .besu .ethereum .eth .transactions .PendingTransaction .MAX_SCORE ;
18+ import static org .hyperledger .besu .ethereum .eth .transactions .TransactionPoolStructuredLogUtils .logInvalid ;
19+ import static org .hyperledger .besu .ethereum .eth .transactions .TransactionPoolStructuredLogUtils .logStart ;
20+ import static org .hyperledger .besu .ethereum .eth .transactions .TransactionPoolStructuredLogUtils .logStop ;
1821import static org .hyperledger .besu .ethereum .transaction .TransactionInvalidReason .CHAIN_HEAD_NOT_AVAILABLE ;
1922import static org .hyperledger .besu .ethereum .transaction .TransactionInvalidReason .CHAIN_HEAD_WORLD_STATE_NOT_AVAILABLE ;
2023import static org .hyperledger .besu .ethereum .transaction .TransactionInvalidReason .INTERNAL_ERROR ;
5861import java .io .FileReader ;
5962import java .io .FileWriter ;
6063import java .io .IOException ;
61- import java .math .BigInteger ;
6264import java .nio .charset .StandardCharsets ;
6365import java .nio .file .Files ;
66+ import java .time .Duration ;
6467import java .util .ArrayList ;
6568import java .util .Collection ;
6669import java .util .Comparator ;
100103 */
101104public class TransactionPool implements BlockAddedObserver {
102105 private static final Logger LOG = LoggerFactory .getLogger (TransactionPool .class );
103- private static final Logger LOG_FOR_REPLAY = LoggerFactory .getLogger ("LOG_FOR_REPLAY" );
104106 private final Supplier <PendingTransactions > pendingTransactionsSupplier ;
105107 private final BlobCache cacheForBlobsOfTransactionsAddedToABlock ;
106108 private volatile PendingTransactions pendingTransactions = new DisabledPendingTransactions ();
@@ -147,46 +149,6 @@ public TransactionPool(
147149 subscribeDroppedTransactions (transactionBroadcaster );
148150 }
149151
150- private void initLogForReplay () {
151- LOG_FOR_REPLAY .trace ("START" );
152- // log the initial block header data
153- LOG_FOR_REPLAY
154- .atTrace ()
155- .setMessage ("{},{},{},{}" )
156- .addArgument (() -> getChainHeadBlockHeader ().map (BlockHeader ::getNumber ).orElse (0L ))
157- .addArgument (
158- () ->
159- getChainHeadBlockHeader ()
160- .flatMap (BlockHeader ::getBaseFee )
161- .map (Wei ::getAsBigInteger )
162- .orElse (BigInteger .ZERO ))
163- .addArgument (() -> getChainHeadBlockHeader ().map (BlockHeader ::getGasUsed ).orElse (0L ))
164- .addArgument (() -> getChainHeadBlockHeader ().map (BlockHeader ::getGasLimit ).orElse (0L ))
165- .log ();
166- // log the priority senders
167- LOG_FOR_REPLAY
168- .atTrace ()
169- .setMessage ("{}" )
170- .addArgument (
171- () ->
172- configuration .getPrioritySenders ().stream ()
173- .map (Address ::toHexString )
174- .collect (Collectors .joining ("," )))
175- .log ();
176- // log the max prioritized txs by type
177- LOG_FOR_REPLAY
178- .atTrace ()
179- .setMessage ("{}" )
180- .addArgument (
181- () ->
182- configuration .getMaxPrioritizedTransactionsByType ().entrySet ().stream ()
183- .map (e -> e .getKey ().name () + "=" + e .getValue ())
184- .collect (Collectors .joining ("," )))
185- .log ();
186- // log configuration: minScore
187- LOG_FOR_REPLAY .atTrace ().setMessage ("{}" ).addArgument (configuration ::getMinScore ).log ();
188- }
189-
190152 @ VisibleForTesting
191153 void handleConnect (final EthPeer peer ) {
192154 transactionBroadcaster .relayTransactionPoolTo (
@@ -196,17 +158,20 @@ void handleConnect(final EthPeer peer) {
196158 public ValidationResult <TransactionInvalidReason > addTransactionViaApi (
197159 final Transaction transaction ) {
198160
199- final var result = addTransaction (transaction , true , MAX_SCORE );
161+ final boolean hasPriority = isPriorityTransaction (transaction , true );
162+ final var result = addTransaction (transaction , true , hasPriority , MAX_SCORE );
200163 if (result .isValid ()) {
201164 localSenders .add (transaction .getSender ());
202165 transactionBroadcaster .onTransactionsAdded (List .of (transaction ));
166+ } else {
167+ logInvalid (transaction , result , true , hasPriority );
203168 }
204169 return result ;
205170 }
206171
207172 public Map <Hash , ValidationResult <TransactionInvalidReason >> addRemoteTransactions (
208173 final Collection <Transaction > transactions ) {
209- final long started = System .currentTimeMillis ();
174+ final long started = System .nanoTime ();
210175 final int initialCount = transactions .size ();
211176 final List <Transaction > addedTransactions = new ArrayList <>(initialCount );
212177 LOG .trace ("Adding {} remote transactions" , initialCount );
@@ -217,27 +182,25 @@ public Map<Hash, ValidationResult<TransactionInvalidReason>> addRemoteTransactio
217182 Collectors .toMap (
218183 Transaction ::getHash ,
219184 transaction -> {
220- final var result = addTransaction (transaction , false , MAX_SCORE );
185+ final boolean hasPriority = isPriorityTransaction (transaction , false );
186+ final var result = addTransaction (transaction , false , hasPriority , MAX_SCORE );
221187 if (result .isValid ()) {
222188 addedTransactions .add (transaction );
189+ } else {
190+ logInvalid (transaction , result , false , hasPriority );
223191 }
224192 return result ;
225193 },
226194 (transaction1 , transaction2 ) -> transaction1 ));
227195
228196 if (isEnabled ()) {
229- LOG_FOR_REPLAY
230- .atTrace ()
231- .setMessage ("S,{}" )
232- .addArgument (() -> pendingTransactions .logStats ())
233- .log ();
197+ TransactionPoolStructuredLogUtils .logStats (pendingTransactions );
234198 }
235199
236200 LOG .atTrace ()
237- .setMessage (
238- "Added {} transactions to the pool in {}ms, {} not added, current pool stats {}" )
201+ .setMessage ("Added {} transactions to the pool in {}, {} not added, current pool stats {}" )
239202 .addArgument (addedTransactions ::size )
240- .addArgument (() -> System .currentTimeMillis () - started )
203+ .addArgument (() -> Duration . ofNanos ( System .nanoTime () - started ) )
241204 .addArgument (() -> initialCount - addedTransactions .size ())
242205 .addArgument (pendingTransactions ::logStats )
243206 .log ();
@@ -249,9 +212,10 @@ public Map<Hash, ValidationResult<TransactionInvalidReason>> addRemoteTransactio
249212 }
250213
251214 private ValidationResult <TransactionInvalidReason > addTransaction (
252- final Transaction baseTransaction , final boolean isLocal , final byte score ) {
253-
254- final boolean hasPriority = isPriorityTransaction (baseTransaction , isLocal );
215+ final Transaction baseTransaction ,
216+ final boolean isLocal ,
217+ final boolean hasPriority ,
218+ final byte score ) {
255219
256220 if (pendingTransactions .containsTransaction (baseTransaction )) {
257221 LOG .atTrace ()
@@ -649,7 +613,7 @@ static ValidationResultAndAccount invalid(final TransactionInvalidReason reason)
649613
650614 public CompletableFuture <Void > setEnabled () {
651615 if (!isEnabled ()) {
652- initLogForReplay ( );
616+ logStart ( configuration , getChainHeadBlockHeader () );
653617 pendingTransactions = pendingTransactionsSupplier .get ();
654618 pendingTransactionsListenersProxy .subscribe ();
655619 isPoolEnabled .set (true );
@@ -696,7 +660,7 @@ public CompletableFuture<Void> setDisabled() {
696660 return null ;
697661 });
698662 pendingTransactions = new DisabledPendingTransactions ();
699- LOG_FOR_REPLAY . trace ( "STOP" );
663+ logStop ( );
700664 return saveOperation ;
701665 }
702666 return CompletableFuture .completedFuture (null );
@@ -918,9 +882,9 @@ private void executeLoadFromDisk() {
918882 Bytes .fromBase64String (
919883 line .substring (scoreStr .length () + 1 ))),
920884 EncodingContext .POOLED_TRANSACTION );
921-
885+ final boolean hasPriority = isPriorityTransaction ( tx , isLocal );
922886 final ValidationResult <TransactionInvalidReason > result =
923- addTransaction (tx , isLocal , score );
887+ addTransaction (tx , isLocal , hasPriority , score );
924888 return result .isValid () ? "OK" : result .getInvalidReason ().name ();
925889 })
926890 .collect (Collectors .groupingBy (Function .identity (), Collectors .counting ()));
0 commit comments