@@ -33,7 +33,7 @@ use crate::TableUpdate::UpgradeFormatVersion;
3333use crate :: error:: Result ;
3434use crate :: spec:: FormatVersion ;
3535use crate :: table:: Table ;
36- use crate :: transaction:: action:: { ActionElement , SetLocation } ;
36+ use crate :: transaction:: action:: { SetLocation , TransactionAction , PendingAction } ;
3737use crate :: transaction:: append:: FastAppendAction ;
3838use crate :: transaction:: sort_order:: ReplaceSortOrderAction ;
3939use crate :: { Catalog , Error , ErrorKind , TableCommit , TableRequirement , TableUpdate } ;
@@ -42,7 +42,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
4242pub struct Transaction < ' a > {
4343 base_table : & ' a Table ,
4444 current_table : Table ,
45- _actions : Vec < ActionElement < ' a > > , // TODO unused for now, should we use this to reapply actions?
45+ actions : Vec < PendingAction > ,
4646 updates : Vec < TableUpdate > ,
4747 requirements : Vec < TableRequirement > ,
4848}
@@ -53,12 +53,21 @@ impl<'a> Transaction<'a> {
5353 Self {
5454 base_table : table,
5555 current_table : table. clone ( ) ,
56- _actions : vec ! [ ] ,
56+ actions : vec ! [ ] ,
5757 updates : vec ! [ ] ,
5858 requirements : vec ! [ ] ,
5959 }
6060 }
6161
62+ pub fn refresh ( old_tx : Transaction < ' a > , refreshed : Table ) -> Result < Self > {
63+ let mut new_tx = Transaction :: new ( & refreshed. clone ( ) ) ;
64+ for action in & old_tx. actions {
65+ new_tx = action. commit ( new_tx) ?
66+ }
67+
68+ Ok ( new_tx)
69+ }
70+
6271 fn update_table_metadata ( & mut self , updates : & [ TableUpdate ] ) -> Result < ( ) > {
6372 let mut metadata_builder = self . current_table . metadata ( ) . clone ( ) . into_builder ( None ) ;
6473 for update in updates {
@@ -188,17 +197,12 @@ impl<'a> Transaction<'a> {
188197 }
189198
190199 /// Set the location of table
191- pub fn set_location ( self ) -> Result < SetLocation < ' a > > {
192- Ok ( SetLocation :: new ( self ) )
193- }
194-
195- fn refresh ( & mut self , refreshed : Table ) {
196- self . base_table = & refreshed;
197- self . current_table = refreshed. clone ( ) ;
200+ pub fn set_location ( self , location : String ) -> Result < Transaction < ' a > > {
201+ Ok ( SetLocation :: new ( ) . set_location ( location) . commit ( self ) ?)
198202 }
199203
200204 /// Commit transaction.
201- pub async fn commit ( mut self , catalog : & dyn Catalog ) -> Result < Table > {
205+ pub async fn commit ( mut self : Transaction < ' a > , catalog : & dyn Catalog ) -> Result < Table > {
202206 // let table_commit = TableCommit::builder()
203207 // .ident(self.base_table.identifier().clone())
204208 // .updates(self.updates)
@@ -217,14 +221,14 @@ impl<'a> Transaction<'a> {
217221 if self . base_table . metadata ( ) != refreshed. metadata ( )
218222 || self . base_table . metadata_location ( ) != refreshed. metadata_location ( )
219223 {
220- self . refresh ( refreshed ) ;
221- self . apply ( self . updates , self . requirements ) // TODO need create new requirements based on the refreshed table
222- . expect ( "Failed to re-apply updates" ) ; // re-apply updates
223- // TODO retry on this error
224- return Err ( Error :: new (
225- ErrorKind :: DataInvalid ,
226- "Cannot commit: stale base table metadata" . to_string ( ) ,
227- ) ) ;
224+ // refresh table
225+ let new_tx = Transaction :: refresh ( self , refreshed) ? ;
226+ return new_tx . commit ( catalog ) . await
227+ // TODO instead of refreshing directly, retry on this error
228+ // return Err(Error::new(
229+ // ErrorKind::DataInvalid,
230+ // "Cannot commit: stale base table metadata".to_string(),
231+ // ));
228232 }
229233
230234 if self . base_table . metadata ( ) == self . current_table . metadata ( )
@@ -386,12 +390,9 @@ mod tests {
386390 fn test_set_location ( ) {
387391 let table = make_v2_table ( ) ;
388392 let tx = Transaction :: new ( & table) ;
389- let set_location = tx
390- . set_location ( )
391- . unwrap ( )
392- . set_location ( String :: from ( "s3://bucket/prefix/new_table" ) ) ;
393-
394- let tx = set_location. commit ( ) . unwrap ( ) ;
393+ let tx = tx
394+ . set_location ( String :: from ( "s3://bucket/prefix/new_table" ) )
395+ . unwrap ( ) ;
395396
396397 assert_eq ! (
397398 vec![ TableUpdate :: SetLocation {
0 commit comments