|
| 1 | +import pytest |
| 2 | +from unittest.mock import Mock |
| 3 | + |
| 4 | +from hiero_sdk_python.account.account_id import AccountId |
| 5 | +from hiero_sdk_python.hapi.services import timestamp_pb2 |
| 6 | +from hiero_sdk_python.tokens.nft_id import NftId |
| 7 | +from hiero_sdk_python.tokens.token_id import TokenId |
| 8 | +from hiero_sdk_python.tokens.token_airdrop_claim import TokenClaimAirdropTransaction |
| 9 | +from hiero_sdk_python.tokens.token_airdrop_pending_id import PendingAirdropId |
| 10 | +from hiero_sdk_python.hapi.services import transaction_body_pb2 |
| 11 | +from hiero_sdk_python.hapi.services.token_claim_airdrop_pb2 import ( # pylint: disable=no-name-in-module |
| 12 | + TokenClaimAirdropTransactionBody, |
| 13 | +) |
| 14 | +from hiero_sdk_python.transaction.transaction_id import TransactionId |
| 15 | +from hiero_sdk_python.transaction.transaction_id import TransactionId |
| 16 | + |
| 17 | +pytestmark = pytest.mark.unit |
| 18 | + |
| 19 | +def _make_fungible_pending(sender: AccountId, receiver: AccountId, num: int) -> PendingAirdropId: |
| 20 | + return PendingAirdropId(sender, receiver, TokenId(0, 0, num), None) |
| 21 | + |
| 22 | +def _make_nft_pending(sender: AccountId, receiver: AccountId, num: int, serial: int) -> PendingAirdropId: |
| 23 | + return PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, num), serial)) |
| 24 | + |
| 25 | +def test_add_pending_airdrop_id(): |
| 26 | + """Test adding one pending fungible airdrop id using chaining method""" |
| 27 | + sender = AccountId(0, 0, 1001) |
| 28 | + receiver = AccountId(0, 0, 1002) |
| 29 | + |
| 30 | + pending_airdrop_fungible_1 = _make_fungible_pending(sender, receiver, 1000) |
| 31 | + |
| 32 | + tx_claim = TokenClaimAirdropTransaction() |
| 33 | + chained = tx_claim.add_pending_airdrop_id(pending_airdrop_fungible_1) |
| 34 | + assert chained is tx_claim # chaining should return same instance |
| 35 | + |
| 36 | + ids = tx_claim.get_pending_airdrop_ids() |
| 37 | + assert isinstance(ids, list) |
| 38 | + assert len(ids) == 1 |
| 39 | + assert ids[0] == pending_airdrop_fungible_1 |
| 40 | + |
| 41 | +def test_add_pending_airdrop_id_nft(): |
| 42 | + """Test adding one pending NFT airdrop id using chaining method""" |
| 43 | + sender = AccountId(0, 0, 2001) |
| 44 | + receiver = AccountId(0, 0, 2002) |
| 45 | + |
| 46 | + pending_airdrop_nft_1 = _make_nft_pending(sender, receiver, 2000, 1) |
| 47 | + |
| 48 | + tx_claim = TokenClaimAirdropTransaction() |
| 49 | + chained = tx_claim.add_pending_airdrop_id(pending_airdrop_nft_1) |
| 50 | + assert chained is tx_claim # chaining should return same instance |
| 51 | + |
| 52 | + ids = tx_claim.get_pending_airdrop_ids() |
| 53 | + assert isinstance(ids, list) |
| 54 | + assert len(ids) == 1 |
| 55 | + assert ids[0] == pending_airdrop_nft_1 |
| 56 | + |
| 57 | +def test_add_pending_airdrop_ids_mixed_fungible_and_nft(): |
| 58 | + """Claim one fungible and one NFT pending airdrop in a single transaction.""" |
| 59 | + sender = AccountId(0, 0, 3001) |
| 60 | + receiver = AccountId(0, 0, 3002) |
| 61 | + |
| 62 | + fungible = _make_fungible_pending(sender, receiver, 3000) # token num=3000 |
| 63 | + nft = _make_nft_pending(sender, receiver, 4000, 1) # token num=4000, serial=1 |
| 64 | + |
| 65 | + tx_claim = TokenClaimAirdropTransaction() |
| 66 | + tx_claim.add_pending_airdrop_id(fungible).add_pending_airdrop_id(nft) |
| 67 | + |
| 68 | + ids = tx_claim.get_pending_airdrop_ids() |
| 69 | + assert isinstance(ids, list) |
| 70 | + assert len(ids) == 2 |
| 71 | + |
| 72 | + # Order should be preserved: [fungible, nft] |
| 73 | + assert ids[0] == fungible |
| 74 | + assert ids[1] == nft |
| 75 | + |
| 76 | +def test_add_pending_airdrop_ids_multiple_mixed_dynamic(): |
| 77 | + """Test adding several fungible + NFT pending airdrop IDs built dynamically.""" |
| 78 | + sender = AccountId(0, 0, 6201) |
| 79 | + receiver = AccountId(0, 0, 6202) |
| 80 | + |
| 81 | + pending_ids = [] |
| 82 | + # Add fungible IDs |
| 83 | + for token_num in (6200, 6201): |
| 84 | + pending_ids.append(PendingAirdropId(sender, receiver, TokenId(0, 0, token_num), None)) |
| 85 | + # Add NFT IDs |
| 86 | + for serial in (1, 2): |
| 87 | + pending_ids.append(PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 7200), serial))) |
| 88 | + |
| 89 | + tx_claim = TokenClaimAirdropTransaction() |
| 90 | + tx_claim.add_pending_airdrop_ids(pending_ids) |
| 91 | + |
| 92 | + ids = tx_claim.get_pending_airdrop_ids() |
| 93 | + assert ids == pending_ids |
| 94 | + |
| 95 | +def test_cannot_exceed_max_airdrops(): |
| 96 | + """ Tests that 10 airdrops is fine but anything more not""" |
| 97 | + sender = AccountId(0, 0, 8001) |
| 98 | + receiver = AccountId(0, 0, 8002) |
| 99 | + tx = TokenClaimAirdropTransaction() |
| 100 | + |
| 101 | + items = [PendingAirdropId(sender, receiver, TokenId(0, 0, 8000 + i), None) |
| 102 | + for i in range(tx.MAX_IDS)] |
| 103 | + tx.add_pending_airdrop_ids(items) |
| 104 | + assert len(tx.get_pending_airdrop_ids()) == tx.MAX_IDS |
| 105 | + |
| 106 | + with pytest.raises(ValueError): |
| 107 | + tx.add_pending_airdrop_id(PendingAirdropId(sender, receiver, TokenId(0, 0, 9999), None)) #This would be 11 |
| 108 | + |
| 109 | +def test_add_batch_overflow_is_atomic(): |
| 110 | + sender_account = AccountId(0, 0, 9001) |
| 111 | + receiver_account = AccountId(0, 0, 9002) |
| 112 | + transaction_claim = TokenClaimAirdropTransaction() |
| 113 | + |
| 114 | + # Fill to exactly MAX_IDS - 1 |
| 115 | + initial_ids = [ |
| 116 | + PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9000 + i), None) |
| 117 | + for i in range(transaction_claim.MAX_IDS - 1) |
| 118 | + ] |
| 119 | + transaction_claim.add_pending_airdrop_ids(initial_ids) |
| 120 | + |
| 121 | + overflow_batch = [ |
| 122 | + PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9990), None), |
| 123 | + PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9991), 1)), |
| 124 | + ] |
| 125 | + |
| 126 | + before_ids = transaction_claim.get_pending_airdrop_ids() |
| 127 | + with pytest.raises(ValueError): |
| 128 | + transaction_claim.add_pending_airdrop_ids(overflow_batch) |
| 129 | + after_ids = transaction_claim.get_pending_airdrop_ids() |
| 130 | + |
| 131 | + assert after_ids == before_ids |
| 132 | + |
| 133 | +def test_min_ids_enforced_on_build_hits_validation(): |
| 134 | + """ Tests that at least one airdrop is required to claim""" |
| 135 | + transaction_claim = TokenClaimAirdropTransaction() |
| 136 | + transaction_claim.transaction_id = TransactionId(AccountId(0, 0, 9999), timestamp_pb2.Timestamp(seconds=1)) |
| 137 | + transaction_claim.node_account_id = AccountId(0, 0, 3) |
| 138 | + |
| 139 | + with pytest.raises(ValueError): |
| 140 | + transaction_claim.build_transaction_body() |
| 141 | + |
| 142 | +def test_rejects_duplicate_fungible(): |
| 143 | + sender = AccountId(0, 0, 8101) |
| 144 | + receiver = AccountId(0, 0, 8102) |
| 145 | + |
| 146 | + f1 = PendingAirdropId(sender, receiver, TokenId(0, 0, 8100), None) |
| 147 | + f2 = PendingAirdropId(sender, receiver, TokenId(0, 0, 8100), None) # duplicate |
| 148 | + |
| 149 | + tx = TokenClaimAirdropTransaction().add_pending_airdrop_id(f1) |
| 150 | + |
| 151 | + with pytest.raises(ValueError): |
| 152 | + tx.add_pending_airdrop_ids([f2]) |
| 153 | + |
| 154 | + # List should remain unchanged because it should deduplicate |
| 155 | + ids = tx.get_pending_airdrop_ids() |
| 156 | + assert ids == [f1] |
| 157 | + |
| 158 | +def test_rejects_duplicate_nft(): |
| 159 | + sender = AccountId(0, 0, 8201) |
| 160 | + receiver = AccountId(0, 0, 8202) |
| 161 | + |
| 162 | + n1 = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8200), 1)) |
| 163 | + n2 = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8200), 1)) # duplicate |
| 164 | + |
| 165 | + tx = TokenClaimAirdropTransaction().add_pending_airdrop_id(n1) |
| 166 | + |
| 167 | + with pytest.raises(ValueError): |
| 168 | + tx.add_pending_airdrop_ids([n2]) |
| 169 | + |
| 170 | + # List should remain unchanged because it should deduplicate |
| 171 | + ids = tx.get_pending_airdrop_ids() |
| 172 | + assert ids == [n1] |
| 173 | + |
| 174 | +def test_build_transaction_body_populates_proto(): |
| 175 | + sender = AccountId(0, 0, 8401) |
| 176 | + receiver = AccountId(0, 0, 8402) |
| 177 | + |
| 178 | + fungible_airdrop = PendingAirdropId(sender, receiver, TokenId(0, 0, 8400), None) |
| 179 | + nft_airdrop = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8405), 3)) |
| 180 | + |
| 181 | + tx_claim = TokenClaimAirdropTransaction().add_pending_airdrop_ids( |
| 182 | + [fungible_airdrop, nft_airdrop] |
| 183 | + ) |
| 184 | + |
| 185 | + # Satisfy base preconditions: set transaction_id and node_account_id |
| 186 | + tx_claim.transaction_id = TransactionId( |
| 187 | + sender, timestamp_pb2.Timestamp(seconds=1, nanos=0) |
| 188 | + ) |
| 189 | + tx_claim.node_account_id = AccountId(0, 0, 3) # dummy node account |
| 190 | + |
| 191 | + body: transaction_body_pb2.TransactionBody = tx_claim.build_transaction_body() |
| 192 | + |
| 193 | + claim = body.tokenClaimAirdrop |
| 194 | + assert isinstance(claim, TokenClaimAirdropTransactionBody) |
| 195 | + assert len(claim.pending_airdrops) == 2 |
| 196 | + |
| 197 | + expected = [a._to_proto().SerializeToString() for a in [fungible_airdrop, nft_airdrop]] |
| 198 | + actual = [a.SerializeToString() for a in claim.pending_airdrops] |
| 199 | + assert actual == expected |
| 200 | + |
| 201 | +def test_from_proto_round_trip(): |
| 202 | + sender_account = AccountId(0, 0, 9041) |
| 203 | + receiver_account = AccountId(0, 0, 9042) |
| 204 | + original_ids = [ |
| 205 | + PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9040), None), |
| 206 | + PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9045), 7)), |
| 207 | + ] |
| 208 | + proto_body = TokenClaimAirdropTransactionBody(pending_airdrops=[i._to_proto() for i in original_ids]) |
| 209 | + |
| 210 | + rebuilt = TokenClaimAirdropTransaction._from_proto(proto_body) # pylint: disable=protected-access |
| 211 | + assert rebuilt.get_pending_airdrop_ids() == original_ids |
| 212 | + |
| 213 | +def test_get_pending_airdrop_ids_returns_copy(): |
| 214 | + sender_account = AccountId(0, 0, 9021) |
| 215 | + receiver_account = AccountId(0, 0, 9022) |
| 216 | + airdrop_id = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9020), None) |
| 217 | + |
| 218 | + transaction_claim = TokenClaimAirdropTransaction().add_pending_airdrop_id(airdrop_id) |
| 219 | + snapshot = transaction_claim.get_pending_airdrop_ids() |
| 220 | + snapshot.append(PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9999), None)) |
| 221 | + |
| 222 | + assert transaction_claim.get_pending_airdrop_ids() == [airdrop_id] # unchanged |
| 223 | + |
| 224 | +def test_order_preserved_across_batched_adds(): |
| 225 | + sender_account = AccountId(0, 0, 9031) |
| 226 | + receiver_account = AccountId(0, 0, 9032) |
| 227 | + |
| 228 | + id_a = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9030), None) |
| 229 | + id_b = PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9035), 1)) |
| 230 | + id_c = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9031), None) |
| 231 | + id_d = PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9035), 2)) |
| 232 | + |
| 233 | + transaction_claim = TokenClaimAirdropTransaction() |
| 234 | + transaction_claim.add_pending_airdrop_ids([id_a, id_b]).add_pending_airdrop_ids([id_c]).add_pending_airdrop_ids([id_d]) |
| 235 | + |
| 236 | + assert transaction_claim.get_pending_airdrop_ids() == [id_a, id_b, id_c, id_d] |
| 237 | + |
| 238 | +def test_add_empty_list_is_noop(): |
| 239 | + sender_account = AccountId(0, 0, 9071) |
| 240 | + receiver_account = AccountId(0, 0, 9072) |
| 241 | + first_id = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9070), None) |
| 242 | + |
| 243 | + transaction_claim = TokenClaimAirdropTransaction().add_pending_airdrop_id(first_id) |
| 244 | + transaction_claim.add_pending_airdrop_ids([]) |
| 245 | + |
| 246 | + assert transaction_claim.get_pending_airdrop_ids() == [first_id] |
| 247 | + |
| 248 | +def test_from_proto_rejects_too_many(): |
| 249 | + sender_account = AccountId(0, 0, 9051) |
| 250 | + receiver_account = AccountId(0, 0, 9052) |
| 251 | + too_many = [PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9050 + i), None) |
| 252 | + for i in range(TokenClaimAirdropTransaction.MAX_IDS + 1)] |
| 253 | + body = TokenClaimAirdropTransactionBody(pending_airdrops=[x._to_proto() for x in too_many]) |
| 254 | + |
| 255 | + with pytest.raises(ValueError): |
| 256 | + TokenClaimAirdropTransaction._from_proto(body) # pylint: disable=protected-access |
| 257 | + |
| 258 | +def test_from_proto_rejects_duplicates(): |
| 259 | + sender_account = AccountId(0, 0, 9061) |
| 260 | + receiver_account = AccountId(0, 0, 9062) |
| 261 | + duplicate = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9060), None) |
| 262 | + body = TokenClaimAirdropTransactionBody(pending_airdrops=[duplicate._to_proto(), duplicate._to_proto()]) |
| 263 | + |
| 264 | + with pytest.raises(ValueError): |
| 265 | + TokenClaimAirdropTransaction._from_proto(body) # pylint: disable=protected-access |
| 266 | + |
| 267 | +def test_cannot_mutate_after_freeze_if_supported(mock_client): |
| 268 | + sender_account = AccountId(0, 0, 9081) |
| 269 | + receiver_account = AccountId(0, 0, 9082) |
| 270 | + |
| 271 | + transaction_claim = TokenClaimAirdropTransaction().add_pending_airdrop_id( |
| 272 | + PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9080), None) |
| 273 | + ) |
| 274 | + if hasattr(transaction_claim, "freeze_with"): |
| 275 | + transaction_claim.freeze_with(mock_client) |
| 276 | + with pytest.raises(Exception): |
| 277 | + transaction_claim.add_pending_airdrop_id( |
| 278 | + PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9085), 1)) |
| 279 | + ) |
0 commit comments