@@ -71,7 +71,6 @@ impl<'a> Transaction<'a> {
7171 Ok ( ( ) )
7272 }
7373
74- // TODO deprecate this and move the logic to TransactionAction
7574 fn apply (
7675 & mut self ,
7776 updates : Vec < TableUpdate > ,
@@ -193,29 +192,52 @@ impl<'a> Transaction<'a> {
193192 Ok ( SetLocation :: new ( self ) )
194193 }
195194
195+ fn refresh ( & mut self , refreshed : Table ) {
196+ self . base_table = & refreshed;
197+ self . current_table = refreshed. clone ( ) ;
198+ }
199+
196200 /// Commit transaction.
197- pub async fn commit ( self , catalog : & dyn Catalog ) -> Result < Table > {
198- let table_commit = TableCommit :: builder ( )
199- . ident ( self . base_table . identifier ( ) . clone ( ) )
200- . updates ( self . updates )
201- . requirements ( self . requirements )
202- . build ( ) ;
201+ pub async fn commit ( mut self , catalog : & dyn Catalog ) -> Result < Table > {
202+ // let table_commit = TableCommit::builder()
203+ // .ident(self.base_table.identifier().clone())
204+ // .updates(self.updates)
205+ // .requirements(self.requirements)
206+ // .build();
203207 if self . base_table . metadata ( ) == self . current_table . metadata ( ) {
204208 return Ok ( self . current_table ) ;
205209 }
206210
207211 // TODO add refresh() in catalog?
208- let refreshed_table = catalog
209- . load_table ( table_commit . identifier ( ) )
212+ let refreshed = catalog
213+ . load_table ( self . base_table . identifier ( ) )
210214 . await
211- . expect ( format ! ( "Failed to refresh table {}" , table_commit . identifier( ) ) . as_str ( ) ) ;
215+ . expect ( format ! ( "Failed to refresh table {}" , self . base_table . identifier( ) ) . as_str ( ) ) ;
212216
213- if self . base_table . metadata ( ) != refreshed_table. metadata ( ) {
214- // TODO raise a real error and retry
215- panic ! ( "Stale base table!" )
217+ if self . base_table . metadata ( ) != refreshed. metadata ( )
218+ || self . base_table . metadata_location ( ) != refreshed. metadata_location ( )
219+ {
220+ self . refresh ( refreshed) ;
221+ self . apply ( self . updates , self . requirements ) // TODO need create new requirements based on the refreshed table
222+ . expect ( "Failed to re-apply updates" ) ; // re-apply updates
223+ // TODO retry on this error
224+ return Err ( Error :: new (
225+ ErrorKind :: DataInvalid ,
226+ "Cannot commit: stale base table metadata" . to_string ( ) ,
227+ ) ) ;
216228 }
217229
218- catalog. update_table ( table_commit) . await
230+ if self . base_table . metadata ( ) == self . current_table . metadata ( )
231+ && self . base_table . metadata_location ( ) == self . current_table . metadata_location ( )
232+ {
233+ // nothing to commit, return current table
234+ return Ok ( self . current_table ) ;
235+ }
236+
237+ catalog
238+ . commit_table ( self . base_table , self . current_table )
239+ . await
240+ // catalog.update_table(table_commit).await
219241 }
220242}
221243
@@ -229,8 +251,8 @@ mod tests {
229251 use crate :: spec:: { FormatVersion , TableMetadata } ;
230252 use crate :: table:: Table ;
231253 use crate :: transaction:: Transaction ;
232- use crate :: { TableIdent , TableUpdate } ;
233254 use crate :: transaction:: action:: TransactionAction ;
255+ use crate :: { TableIdent , TableUpdate } ;
234256
235257 fn make_v1_table ( ) -> Table {
236258 let file = File :: open ( format ! (
@@ -368,7 +390,7 @@ mod tests {
368390 . set_location ( )
369391 . unwrap ( )
370392 . set_location ( String :: from ( "s3://bucket/prefix/new_table" ) ) ;
371-
393+
372394 let tx = set_location. commit ( ) . unwrap ( ) ;
373395
374396 assert_eq ! (
0 commit comments