2222import io .netty .buffer .ByteBuf ;
2323import io .netty .buffer .Unpooled ;
2424import io .netty .util .Recycler ;
25- import io .netty .util .Recycler .Handle ;
26- import io .netty .util .ReferenceCounted ;
2725import org .apache .bookkeeper .client .api .LedgerEntry ;
26+ import org .apache .bookkeeper .client .impl .LedgerEntryImpl ;
2827import org .apache .bookkeeper .mledger .Entry ;
2928import org .apache .bookkeeper .mledger .Position ;
30- import org .apache .bookkeeper .mledger .PositionFactory ;
31- import org .apache .bookkeeper .mledger .util .AbstractCASReferenceCounted ;
32- import org .apache .bookkeeper .mledger .util .RangeCache ;
33-
34- public final class EntryImpl extends AbstractCASReferenceCounted implements Entry , Comparable <EntryImpl >,
35- RangeCache .ValueWithKeyValidation <Position > {
29+ import org .apache .bookkeeper .mledger .intercept .ManagedLedgerInterceptor ;
3630
31+ public final class EntryImpl extends AbstractEntryImpl <EntryImpl > {
3732 private static final Recycler <EntryImpl > RECYCLER = new Recycler <EntryImpl >() {
3833 @ Override
3934 protected EntryImpl newObject (Handle <EntryImpl > handle ) {
4035 return new EntryImpl (handle );
4136 }
4237 };
4338
44- private final Handle <EntryImpl > recyclerHandle ;
45- private long timestamp ;
46- private long ledgerId ;
47- private long entryId ;
48- private Position position ;
49- ByteBuf data ;
50-
51- private Runnable onDeallocate ;
39+ public static EntryImpl create (LedgerEntry ledgerEntry , ManagedLedgerInterceptor interceptor ) {
40+ ManagedLedgerInterceptor .PayloadProcessorHandle processorHandle = null ;
41+ if (interceptor != null ) {
42+ ByteBuf duplicateBuffer = ledgerEntry .getEntryBuffer ().retainedDuplicate ();
43+ processorHandle = interceptor
44+ .processPayloadBeforeEntryCache (duplicateBuffer );
45+ if (processorHandle != null ) {
46+ ledgerEntry = LedgerEntryImpl .create (ledgerEntry .getLedgerId (), ledgerEntry .getEntryId (),
47+ ledgerEntry .getLength (), processorHandle .getProcessedPayload ());
48+ } else {
49+ duplicateBuffer .release ();
50+ }
51+ }
52+ EntryImpl returnEntry = create (ledgerEntry );
53+ if (processorHandle != null ) {
54+ processorHandle .release ();
55+ ledgerEntry .close ();
56+ }
57+ return returnEntry ;
58+ }
5259
5360 public static EntryImpl create (LedgerEntry ledgerEntry ) {
5461 EntryImpl entry = RECYCLER .get ();
5562 entry .timestamp = System .nanoTime ();
5663 entry .ledgerId = ledgerEntry .getLedgerId ();
5764 entry .entryId = ledgerEntry .getEntryId ();
58- entry .data = ledgerEntry .getEntryBuffer ();
59- entry .data .retain ();
65+ entry .setDataBuffer (ledgerEntry .getEntryBuffer ().retain ());
6066 entry .setRefCnt (1 );
6167 return entry ;
6268 }
@@ -67,7 +73,7 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
6773 entry .timestamp = System .nanoTime ();
6874 entry .ledgerId = ledgerId ;
6975 entry .entryId = entryId ;
70- entry .data = Unpooled .wrappedBuffer (data );
76+ entry .setDataBuffer ( Unpooled .wrappedBuffer (data ) );
7177 entry .setRefCnt (1 );
7278 return entry ;
7379 }
@@ -77,8 +83,7 @@ public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
7783 entry .timestamp = System .nanoTime ();
7884 entry .ledgerId = ledgerId ;
7985 entry .entryId = entryId ;
80- entry .data = data ;
81- entry .data .retain ();
86+ entry .setDataBuffer (data .retain ());
8287 entry .setRefCnt (1 );
8388 return entry ;
8489 }
@@ -88,128 +93,22 @@ public static EntryImpl create(Position position, ByteBuf data) {
8893 entry .timestamp = System .nanoTime ();
8994 entry .ledgerId = position .getLedgerId ();
9095 entry .entryId = position .getEntryId ();
91- entry .data = data ;
92- entry .data .retain ();
96+ entry .setDataBuffer (data .retain ());
9397 entry .setRefCnt (1 );
9498 return entry ;
9599 }
96100
97- public static EntryImpl create (EntryImpl other ) {
101+ public static EntryImpl create (Entry other ) {
98102 EntryImpl entry = RECYCLER .get ();
99103 entry .timestamp = System .nanoTime ();
100- entry .ledgerId = other .ledgerId ;
101- entry .entryId = other .entryId ;
102- entry .data = other .data .retainedDuplicate ();
104+ entry .ledgerId = other .getLedgerId () ;
105+ entry .entryId = other .getEntryId () ;
106+ entry .setDataBuffer ( other .getDataBuffer () .retainedDuplicate () );
103107 entry .setRefCnt (1 );
104108 return entry ;
105109 }
106110
107111 private EntryImpl (Recycler .Handle <EntryImpl > recyclerHandle ) {
108- this .recyclerHandle = recyclerHandle ;
109- }
110-
111- public void onDeallocate (Runnable r ) {
112- if (this .onDeallocate == null ) {
113- this .onDeallocate = r ;
114- } else {
115- // this is not expected to happen
116- Runnable previous = this .onDeallocate ;
117- this .onDeallocate = () -> {
118- try {
119- previous .run ();
120- } finally {
121- r .run ();
122- }
123- };
124- }
125- }
126-
127- public long getTimestamp () {
128- return timestamp ;
129- }
130-
131- @ Override
132- public ByteBuf getDataBuffer () {
133- return data ;
134- }
135-
136- @ Override
137- public byte [] getData () {
138- byte [] array = new byte [data .readableBytes ()];
139- data .getBytes (data .readerIndex (), array );
140- return array ;
141- }
142-
143- // Only for test
144- @ Override
145- public byte [] getDataAndRelease () {
146- byte [] array = getData ();
147- release ();
148- return array ;
149- }
150-
151- @ Override
152- public int getLength () {
153- return data .readableBytes ();
154- }
155-
156- @ Override
157- public Position getPosition () {
158- if (position == null ) {
159- position = PositionFactory .create (ledgerId , entryId );
160- }
161- return position ;
162- }
163-
164- @ Override
165- public long getLedgerId () {
166- return ledgerId ;
167- }
168-
169- @ Override
170- public long getEntryId () {
171- return entryId ;
172- }
173-
174- @ Override
175- public int compareTo (EntryImpl other ) {
176- if (this .ledgerId != other .ledgerId ) {
177- return this .ledgerId < other .ledgerId ? -1 : 1 ;
178- }
179-
180- if (this .entryId != other .entryId ) {
181- return this .entryId < other .entryId ? -1 : 1 ;
182- }
183-
184- return 0 ;
185- }
186-
187- @ Override
188- public ReferenceCounted touch (Object hint ) {
189- return this ;
190- }
191-
192- @ Override
193- protected void deallocate () {
194- // This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
195- if (onDeallocate != null ) {
196- try {
197- onDeallocate .run ();
198- } finally {
199- onDeallocate = null ;
200- }
201- }
202- data .release ();
203- data = null ;
204- timestamp = -1 ;
205- ledgerId = -1 ;
206- entryId = -1 ;
207- position = null ;
208- recyclerHandle .recycle (this );
209- }
210-
211- @ Override
212- public boolean matchesKey (Position key ) {
213- return key .compareTo (ledgerId , entryId ) == 0 ;
112+ super (recyclerHandle );
214113 }
215114}
0 commit comments