diff --git a/bnr-utils/nzsyncbackup/connector/AZConnector.go b/bnr-utils/nzsyncbackup/connector/AZConnector.go new file mode 100644 index 0000000..baf8042 --- /dev/null +++ b/bnr-utils/nzsyncbackup/connector/AZConnector.go @@ -0,0 +1,360 @@ +package Connector + +import ( + "fmt" + "strings" + "net/url" + "time" + "os" + "path/filepath" + "path" + "context" + "log" + "github.com/Azure/azure-storage-blob-go/azblob" +) + +type iaz interface { + getServiceURL() (azblob.ServiceURL, error) + getContainerURL() (azblob.ContainerURL, error) + getBlockBlobURL(blobname string) (azblob.BlockBlobURL, error) + getBlobURL(blobname string) (azblob.BlobURL, error) + uploadFile(absfilepath string, relfilepath string, uniqueid string) (error) + downloadFile(outfilepath string, blobname string, streams uint, blockSize int64) error + +} + +type AZConnector struct { + Azaccount string + Azkey string + Azcontainer string + Streams uint + Blocksize int64 +} + +type job struct { + uniqueid string + bkpdir string +} + +type uploadJob struct { + job + absfilepath string +} + +type jobResult struct { + *job + err error +} + +type downloadJob struct { + conn AZConnector + blobname string + outfilepath string +} + +type downloadJobResult struct { + blobname string + err error +} + +func (j *uploadJob) uploadAZ(conn *AZConnector) error { + relfilepath, err := filepath.Rel(j.job.bkpdir, j.absfilepath) + if err != nil { + return fmt.Errorf("Unable to traverse %s, %s: %v", j.job.bkpdir, j.absfilepath, err) + } + + log.Println("Uploading file :", j.absfilepath) + return conn.uploadFile(j.absfilepath, relfilepath, j.job.uniqueid) +} + +func (cn *AZConnector) getServiceURL() (azblob.ServiceURL, error) { + var serviceURL azblob.ServiceURL + us := fmt.Sprintf("https://%s.blob.core.windows.net/", cn.Azaccount) + u, err := url.Parse(us) + if err != nil { + return serviceURL, fmt.Errorf("Unable to parse URL: %s : %v", us, err) + } + + credential, err := azblob.NewSharedKeyCredential(cn.Azaccount, cn.Azkey) + if err != nil { + return serviceURL, fmt.Errorf("Unable to create shared credentials: %v", err) + } + + p := azblob.NewPipeline(credential, azblob.PipelineOptions{ + Retry: azblob.RetryOptions{ + TryTimeout: 5 * time.Minute, + }, + }) + + serviceURL = azblob.NewServiceURL(*u, p) + return serviceURL, nil +} + +func (cn *AZConnector) getContainerURL() (azblob.ContainerURL, error) { + var containerURL azblob.ContainerURL + serviceURL, err := cn.getServiceURL() + if err == nil { + containerURL = serviceURL.NewContainerURL(cn.Azcontainer) + } + return containerURL, err +} + +func (cn *AZConnector) getBlobURL(blobname string) (azblob.BlobURL, error) { + var blobURL azblob.BlobURL + containerURL, err := cn.getContainerURL() + if err == nil { + blobURL = containerURL.NewBlobURL(blobname) + } + return blobURL, err +} + +func (cn *AZConnector) getBlockBlobURL(blobname string) (azblob.BlockBlobURL, error) { + var blockBlobURL azblob.BlockBlobURL + containerURL, err := cn.getContainerURL() + if err == nil { + blockBlobURL = containerURL.NewBlockBlobURL(blobname) + } + return blockBlobURL, err +} + +func (cn *AZConnector) uploadFile(absfilepath string, relfilepath string, uniqueid string) (error){ + // Upload the file to a block blob + blockBlobURL, err := cn.getBlockBlobURL(uniqueid+"/"+relfilepath) + if err != nil { + return err + } + + file, err := os.Open(absfilepath) + if err != nil { + return fmt.Errorf("Error in opening backup file : %v", err) + } + + _, err = azblob.UploadFileToBlockBlob(context.Background(), file, blockBlobURL, + azblob.UploadToBlockBlobOptions{ + BlockSize: int64(cn.Blocksize * 1024 * 1024), + Parallelism: uint16(cn.Streams), + }) + + return err +} + + +func (cn *AZConnector) Upload( otherargs *OtherArgs, backupinfo *BackupInfo ) (error){ + var err error + dirlist := strings.Split(backupinfo.Dir," ") + for _, bkpdir := range dirlist { + log.Println("Uploading backup data to azure cloud from backup dir", bkpdir) + backupdir := filepath.Join(bkpdir, "Netezza", backupinfo.npshost, backupinfo.dbname, backupinfo.backupset, backupinfo.increment) + _, err = os.Stat(backupdir) + if err != nil { + return fmt.Errorf("Cannot access directory '%s': %v. Please check if DB name, hostname are correct.", backupdir, err) + } + work := make(chan *uploadJob, otherargs.paralleljobs) + result := make(chan *jobResult, otherargs.paralleljobs) + done := make(chan bool) + + go func() { + for { + select { + case j, ok := <- work: + if ! ok { + // done + close(result) + return + } + err := j.uploadAZ(cn) + jr := jobResult{ job:&j.job, err:err } + result <- &jr + } + } + }() + + filesuploaded := 0 + go func() { + for { + select { + case r, ok := <- result: + if ! ok { + // work done + done <- true + return + } + if r.err != nil { + // stopping right here so that we + // don't keep on uploading when one has failed + log.Println("Error while uploading file. Ensure azure storage account name, azure key and container name are correct. If error persists contact IBM support team.", *r.job) + log.Fatalf("Azure storage account:%s accessing container:%s failed with error: %v", cn.Azaccount, cn.Azcontainer, r.err) + } + filesuploaded++ // this is fine, since this is single threaded increment + } + } + }() + + err = filepath.Walk(backupdir, + func(absfilepath string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + j := uploadJob{ job: job{otherargs.uniqueid, bkpdir}, absfilepath: absfilepath } + work <- &j // this will hang until at least one of the prior uploads finish if other.paralleljobs + // are already running + return err + }) + close(work) + <- done + if (err != nil) { + return(fmt.Errorf("Error reading directory: %s: %v. Please check if DB name, hostname are correct.", backupdir, err)) + } + log.Println("Upload successful for Backup Dir :", bkpdir) + log.Println("Total files uploaded:", filesuploaded) + } + return err +} + +func (j *downloadJob) downloadAZ(conn *AZConnector) error { + + log.Println("Downloading file :", j.blobname) + return conn.downloadFile(j.outfilepath, j.blobname, conn.Streams, conn.Blocksize) +} + +func (cn *AZConnector) downloadFile(outfilepath string, blobname string, streams uint, blockSize int64) error { + + filehandle, err := os.Create(outfilepath) + if err != nil { + return fmt.Errorf("Error in creating file inside backup dir: %v",err) + } + + defer filehandle.Close() + + blobURL, err := cn.getBlobURL(blobname) + if err != nil { + return err + } + + // Perform download + err = azblob.DownloadBlobToFile(context.Background(), blobURL, 0, 0, filehandle, + azblob.DownloadFromBlobOptions{ + BlockSize: int64(blockSize * 1024 * 1024), + RetryReaderOptionsPerBlock: azblob.RetryReaderOptions{MaxRetryRequests: 20}, + Parallelism: uint16(streams), + }) + if err != nil { + return fmt.Errorf("Error in downloading an Azure blob to a file: %v",err) + } + return err +} + + +func (cn *AZConnector) Download(otherargs *OtherArgs, backupinfo *BackupInfo) (error){ + var err error + log.Println("Downloading backup data from azure cloud to backup dir", backupinfo.Dir) + outdir := backupinfo.Dir + locations:= []string{} + contents:= []string{} + work := make(chan *downloadJob, otherargs.paralleljobs) + result := make(chan *downloadJobResult, otherargs.paralleljobs) + done := make(chan bool) + + blobpath := filepath.Join(otherargs.uniqueid, "Netezza",backupinfo.npshost, backupinfo.dbname, backupinfo.backupset, backupinfo.increment) + + // start the workers + go func() { + for { + select { + case j, ok := <- work: + if ! ok { + // done + close(result) + return + } + err := j.downloadAZ(cn) + jr := downloadJobResult{ blobname:j.blobname, err:err } + result <- &jr + } + } + }() + + filesdownloaded := 0 + go func() { + for { + select { + case r, ok := <- result: + if ! ok { + // work done + done <- true + return + } + if r.err != nil { + // stopping right here so that we + // don't keep on uploading when one has failed + log.Fatalf("%s: %v", r.blobname, r.err) + } + filesdownloaded++ // this is fine, since this is single threaded increment + } + } + }() + + + blobfound := 0 + + for marker := (azblob.Marker{}); marker.NotDone(); { + containerURL, err := cn.getContainerURL() + if err != nil { + return err + } + + // Get a result segment starting with the blob indicated by the current Marker. + listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{}) + if err != nil { + return fmt.Errorf("Unable to list segment of blobs with storage account:%s and container:%s. Ensure azure storage account and container are correct.\n Error details: %v",cn.Azaccount, cn.Azcontainer, err) + } + + // ListBlobs returns the start of the next segment; you MUST use this to get + // the next segment (after processing the current result segment). + marker = listBlob.NextMarker + // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute) + for _, blobInfo := range listBlob.Segment.BlobItems { + if strings.HasPrefix(blobInfo.Name, blobpath) { + + // Set up file to download the blob to + dir, filename := filepath.Split(blobInfo.Name) + + relfilepath, err := filepath.Rel(otherargs.uniqueid,dir) + if err != nil { + return fmt.Errorf("Error in fetching download relative path: %v",err) + } + + dumpdir := filepath.Join(outdir, relfilepath) + err = os.MkdirAll(dumpdir, 0777) + if err != nil { + return fmt.Errorf("Error in creating backup directory structure: %v",err) + } + + switch filename { + case "locations.txt": + locations = append(locations, path.Join(dumpdir, filename)) + case "contents.txt": + contents = append(contents, path.Join(dumpdir, filename)) + } + + outfilepath := path.Join(dumpdir, filename) + + j := downloadJob{ conn:*cn, outfilepath:outfilepath, blobname: blobInfo.Name } + work <- &j + + blobfound-- + } + } + + if blobfound == 0 { + return fmt.Errorf("No matching blob found. Please check if DB name, hostname, uniqueid or containername are correct. If error persists contact IBM support team. Azaccount:%s AzContainer:%s Blobpath:%s, Uniqueid:%s", cn.Azaccount, cn.Azcontainer, blobpath, otherargs.uniqueid) + } + } + close(work) + <- done + log.Println("File Downloaded to dir :", outdir) + log.Println("Total files downloaded:", filesdownloaded) + updateLocation(locations,outdir) + updateContents(contents) + return err +} diff --git a/bnr-utils/nzsyncbackup/connector/Connector.go b/bnr-utils/nzsyncbackup/connector/Connector.go new file mode 100644 index 0000000..0b70dba --- /dev/null +++ b/bnr-utils/nzsyncbackup/connector/Connector.go @@ -0,0 +1,177 @@ +package Connector + +import ( + "log" + "fmt" + "time" + "io" + "io/ioutil" + "os" + "path" + "strings" + "github.com/spf13/cobra" +) + +type IConnector interface { + Upload(*OtherArgs, *BackupInfo) (error) + Download( *OtherArgs, *BackupInfo) (error) +} + +type BackupInfo struct { + dbname string + Dir string + npshost string + backupset string + increment string +} + +type OtherArgs struct { + uniqueid string + logfiledir string + paralleljobs int + Operation int + upload *bool + download *bool + Connector string +} + +func ParseArgs(backupinfo *BackupInfo, otherargs *OtherArgs, azconnect *AZConnector, s3connect *S3connector) { + var cmdAws = &cobra.Command{ + Use : "aws", + Short: "Upload/Download Backup to/from AWS Cloud", + Run: func(cmd *cobra.Command, args []string) { + otherargs.Connector = "aws" + }, + } + var cmdAzure = &cobra.Command{ + Use : "azure", + Short: "Upload/Download Backup to/From Azure Cloud", + Run: func(cmd *cobra.Command, args []string) { + otherargs.Connector = "azure" + }, + } + + + var rootCmd = &cobra.Command{Use: "nzsyncbackup"} + + + rootCmd.PersistentFlags().StringVar(&backupinfo.dbname, "db", "", "Database name") + rootCmd.PersistentFlags().StringVar(&backupinfo.Dir, "dir", "", "Full path to the directory in which the backup already exists or should be downloaded") + rootCmd.PersistentFlags().StringVar(&backupinfo.npshost, "npshost", "", "Name of the NPS host as it appears in the backups") + rootCmd.PersistentFlags().StringVar(&backupinfo.backupset, "backupset", "", "Name of the backupset to be uploaded/downloaded") + rootCmd.PersistentFlags().StringVar(&backupinfo.increment, "increment", "", "Increment Number to be uploaded/downloaded") + rootCmd.PersistentFlags().StringVar(&otherargs.uniqueid,"uniqueid", "", "Azure blob storage container") + rootCmd.PersistentFlags().StringVar(&otherargs.logfiledir,"logfiledir", "/tmp", "Logfile directory for this utility. Default is /tmp dir") + otherargs.upload = rootCmd.PersistentFlags().Bool("upload", false, "Upload Backup to Cloud") + otherargs.download = rootCmd.PersistentFlags().Bool("download", false, "Download backup from cloud") + rootCmd.PersistentFlags().IntVar(&otherargs.paralleljobs,"paralleljobs",6,"Number of parallel files to upload/download") + + + + + cmdAws.Flags().StringVar(&s3connect.Access_key_id, "access-key", "", "The access key for the object store [AWS_ACCESS_KEY_ID] (required)") + cmdAws.Flags().StringVar(&s3connect.Bucket_url, "bucket-url", "", "The bucket url to store backups to (required)") + cmdAws.Flags().StringVar(&s3connect.Default_region, "region", "", "The region of the object store bucket (required)") + cmdAws.Flags().StringVar(&s3connect.Secret_access_key, "secret-key", "", "The secret key for the object store [AWS_SECRET_ACCESS_KEY] (required)") + cmdAws.Flags().StringVar(&s3connect.Endpoint, "endpoint", "", "The endpoint for object to be store on IBM cloud") + cmdAws.Flags().IntVar(&s3connect.Streams, "streams", 16, "Number of blocks to upload/download in parallel default 16") + cmdAws.Flags().Int64Var(&s3connect.Blocksize, "blocksize", 100, "Block size in MB to upload/download file") + + cmdAzure.Flags().StringVar(&azconnect.Azkey, "account-key", "", "The Azure Blob account key (required)") + cmdAzure.Flags().StringVar(&azconnect.Azaccount, "account-name", "", "The Azure Blob account name (required)") + cmdAzure.Flags().UintVar(&azconnect.Streams, "streams", 16, "Number of blocks to upload/download in parallel default 16") + cmdAzure.Flags().Int64Var(&azconnect.Blocksize, "blocksize", 100, "Block size in MB to upload/download file") + cmdAzure.Flags().StringVar(&azconnect.Azcontainer, "container", "", "The Azure Blob container name (required)") + cmdAws.MarkFlagRequired("region") + cmdAws.MarkFlagRequired("access-key") + cmdAws.MarkFlagRequired("bucket-name") + cmdAws.MarkFlagRequired("secret-key") + cmdAzure.MarkFlagRequired("account-key") + cmdAzure.MarkFlagRequired("account-name") + cmdAzure.MarkFlagRequired("container") + rootCmd.AddCommand(cmdAws, cmdAzure) + + if err := rootCmd.Execute(); err != nil { + log.Println(err) + os.Exit(1) + } +} + +func SetUpLogFile(backupinfo *BackupInfo, otherargs *OtherArgs) { + // log file configuration setup + logfilename := fmt.Sprintf("nz_%sConnector_%d_%s.log", otherargs.Connector, os.Getppid(), time.Now().Format("2006-01-02-150405")) + logfilepath := path.Join(otherargs.logfiledir, logfilename) + filehandle, err := os.OpenFile(logfilepath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + fmt.Errorf("Error in opening logfile: %v",err) + } + w := io.MultiWriter(os.Stdout, filehandle) + log.SetOutput(w) + prefixStr := fmt.Sprintf("%s ", time.Now().UTC().Format("2006-01-02 15:04:05 EST")) + fmt.Sprintf("%-7s", "[INFO]") + log.SetFlags(0) + log.SetPrefix(prefixStr) + + log.Println("Backup/Restore directory :",backupinfo.Dir) + log.Println("DB name :", backupinfo.dbname) + log.Println("Nps hostname :", backupinfo.npshost) + log.Println("BackupsetID :", backupinfo.backupset) + log.Println("UniqueID :", otherargs.uniqueid) + log.Println("Number of files to upload/download in parallel :", otherargs.paralleljobs) +} + +func SetOperation(otherargs *OtherArgs) { + if (*otherargs.upload){ + otherargs.Operation = 0 + } + if (*otherargs.download){ + otherargs.Operation = 1 + } +} + +func updateLocation(arrLoc []string,outdir string){ + for _,locFile := range arrLoc{ + input, err := ioutil.ReadFile(locFile) + if err != nil { + log.Fatalf("Unable to open %s to read: %v\n", locFile, err) + } + lines := strings.Split(string(input), "\n") + if (len(lines) == 2 && !strings.HasSuffix(lines[len(lines) -2] , outdir)) { + f, err := os.OpenFile(locFile, + os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Fatalf("Unable to open %s for update: %v\n", locFile, err) + } + defer f.Close() + textAppend := "1,1,1," + outdir + "\n" + if _, err := f.WriteString(textAppend); err != nil { + log.Fatalf("Unable to update %s: %v\n", locFile, err) + } + } + } +} + +func updateContents(arrContents []string){ + for _,contentFile := range arrContents{ + input, err := ioutil.ReadFile(contentFile) + if err != nil { + log.Fatalln("Unable to open %s to read: %v\n",contentFile,err) + } + + lines := strings.Split(string(input), "\n") + var textline []string + for i := 0 ; i < len(lines) ; i++ { + line := lines[i] + token := strings.Split(line, ",") + if ( token[len(token)-1] == "0" ) { + token[len(token)-1] = "1" + } + textline = append(textline, strings.Join(token, ",")) + } + output := strings.Join(textline, "\n") + err = ioutil.WriteFile(contentFile, []byte(output), 0644) + if err != nil { + log.Fatalln("Unable to update %s: %v\n",contentFile,err) + } + } +} + diff --git a/bnr-utils/nzsyncbackup/connector/S3Connector.go b/bnr-utils/nzsyncbackup/connector/S3Connector.go new file mode 100644 index 0000000..59d64b3 --- /dev/null +++ b/bnr-utils/nzsyncbackup/connector/S3Connector.go @@ -0,0 +1,321 @@ +package Connector + +import ( + "fmt" + "strings" + "os" + "path/filepath" + "path" + "log" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go/aws/credentials" +) + +type is3 interface { + getUploader()(*s3manager.Uploader, error) + getDownloader()(*s3manager.Downloader, error) + getSession() (sess *session.Session) + uploadFile(absfilepath string, relfilepath string, uniqueid string) (error) + downloadFile(outfilepath string, key string, conn *s3manager.Downloader) (error) +} + +type S3connector struct { + Access_key_id string + Secret_access_key string + Default_region string + Bucket_url string + Endpoint string + Streams int + Blocksize int64 +} + +type jobS3 struct { + uniqueid string + bkpdir string +} + +type uploadJobS3 struct { + jobS3 + absfilepath string +} + +type jobResultS3 struct { + *jobS3 + err error +} + +type downloadJobS3 struct { + conn *s3manager.Downloader + key string + outfilepath string +} + +type downloadJobResultS3 struct { + key string + err error +} + + +func (j *uploadJobS3) uploadS3(conn *S3connector) error { + relfilepath, err := filepath.Rel(j.jobS3.bkpdir, j.absfilepath) + if err != nil { + return fmt.Errorf("Unable to traverse %s, %s: %v", j.jobS3.bkpdir, j.absfilepath, err) + } + + log.Println("Uploading file :", j.absfilepath) + return conn.uploadFileS3(j.absfilepath, relfilepath, j.uniqueid) +} + +func (c *S3connector) uploadFileS3(absfilepath string, relfilepath string, uniqueid string) (error){ + // Upload the file to a block blob + uploader := c.getUploader() + + file, err := os.Open(absfilepath) + if err != nil { + return fmt.Errorf("Error in opening backup file : %v", err) + } + + result,err := uploader.Upload(&s3manager.UploadInput{ + Bucket: &c.Bucket_url, + Key: aws.String(filepath.Join(uniqueid, relfilepath)), + Body: file, + }) + + if (result == nil) { + return fmt.Errorf("Error in uploading file : %v", err) + } + return err +} + +func (j *downloadJobS3) downloadS3(c *S3connector) error { + log.Println("Downloading file :", j.key) + return c.downloadFile(j.outfilepath, j.key, j.conn ) +} + +func (c *S3connector) downloadFile(outfilepath string, key string, conn *s3manager.Downloader) error { + filehandle, err := os.Create(outfilepath) + if err != nil { + return fmt.Errorf("Error in creating file inside backup dir: %v",err) + } + + defer filehandle.Close() + + numBytes, err := conn.Download(filehandle, + &s3.GetObjectInput{ + Bucket: aws.String(c.Bucket_url), + Key: aws.String(key), + }) + + if((numBytes < 0) && (err != nil)){ + return fmt.Errorf("Error in downloading File: %v",err) + } + + return err +} + +func (c *S3connector) getSession() (sess *session.Session) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(c.Default_region), + Credentials: credentials.NewStaticCredentials(c.Access_key_id,c.Secret_access_key,""), + CredentialsChainVerboseErrors: aws.Bool(true) }) + if (err != nil) { + log.Fatalln("Session failed:", err) + } + + return sess +} + +func (c *S3connector) getUploader() (*s3manager.Uploader) { + + sessn := c.getSession() + uploader := s3manager.NewUploader(sessn,func(u *s3manager.Uploader) { + u.PartSize = c.Blocksize * 1024 * 1024 // 64MB per part + u.Concurrency = c.Streams + }) + + return uploader +} + +func (c *S3connector) getDownloader() (*s3manager.Downloader, *session.Session) { + + sessn := c.getSession() + downloader := s3manager.NewDownloader(sessn,func(d *s3manager.Downloader) { + d.PartSize = c.Blocksize * 1024 * 1024 // 64MB per part + d.Concurrency = c.Streams + }) + + return downloader,sessn +} + +func (c *S3connector) Upload( otherargs *OtherArgs, backupinfo *BackupInfo ) (error){ + var err error + log.Println("Uploading Using S3 Connector") + + dirlist := strings.Split(backupinfo.Dir," ") + for _, bkpdir := range dirlist { + log.Println("Uploading backup data to aws s3 cloud from backup dir", bkpdir) + backupdir := filepath.Join(bkpdir, "Netezza", backupinfo.npshost, backupinfo.dbname, backupinfo.backupset, backupinfo.increment) + _, err = os.Stat(backupdir) + if err != nil { + return fmt.Errorf("Cannot access directory '%s': %v. Please check if DB name, hostname are correct.", backupdir, err) + } + + work := make(chan *uploadJobS3, otherargs.paralleljobs) + result := make(chan *jobResultS3, otherargs.paralleljobs) + done := make(chan bool) + + go func() { + for { + select { + case j, ok := <- work: + if ! ok { + // done + close(result) + return + } + err := j.uploadS3(c) + jr := jobResultS3{jobS3:&j.jobS3, err:err } + result <- &jr + } + } + }() + + filesuploaded := 0 + go func() { + for { + select { + case r, ok := <- result: + if ! ok { + // work done + done <- true + return + } + if r.err != nil { + // stopping right here so that we + // don't keep on uploading when one has failed + log.Println("Error while uploading file. Ensure aws s3 access-key-id, secret-access-key, bucket_url are correct. If error persists contact IBM support team.", *r.jobS3) + log.Fatalf("Failed to access AWS bucket: %s with error: %v", c.Bucket_url, r.err) + } + filesuploaded++ // this is fine, since this is single threaded increment + } + } + }() + + err = filepath.Walk(bkpdir, + func(absfilepath string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + if ( strings.HasPrefix(absfilepath, backupdir) ) { + j := uploadJobS3{ jobS3: jobS3{otherargs.uniqueid, bkpdir}, absfilepath: absfilepath } + work <- &j // this will hang until at least one of the prior uploads finish if other.paralleljobs + // are already running + } + return err + }) + + close(work) + <- done + if (err != nil) { + return(fmt.Errorf("Error reading directory: %s: %v. Please check if DB name, hostname are correct.", backupdir, err)) + } + log.Println("Upload using S3 connector successful for directory :", bkpdir) + log.Println("Total files uploaded:", filesuploaded) + } + return err +} + +func (cn *S3connector) Download(otherargs *OtherArgs, backupinfo *BackupInfo) (error){ + log.Println("Downloading backup data from aws cloud to backup dir", backupinfo.Dir) + var err error + outdir := backupinfo.Dir + locations:= []string{} + contents:= []string{} + work := make(chan *downloadJobS3, otherargs.paralleljobs) + result := make(chan *downloadJobResultS3, otherargs.paralleljobs) + done := make(chan bool) + + bkpath := filepath.Join(otherargs.uniqueid, "Netezza",backupinfo.npshost, backupinfo.dbname, backupinfo.backupset, backupinfo.increment) + // start the workers + go func() { + for { + select { + case j, ok := <- work: + if ! ok { + // done + close(result) + return + } + err := j.downloadS3(cn) + jr := downloadJobResultS3{ key:j.key, err:err } + result <- &jr + } + } + }() + filesdownloaded := 0 + go func() { + for { + select { + case r, ok := <- result: + if ! ok { + // work done + done <- true + return + } + if r.err != nil { + // stopping right here so that we + // don't keep on uploading when one has failed + log.Fatalf("%s: %v", r.key, r.err) + } + filesdownloaded++ // this is fine, since this is single threaded increment + } + } + }() + + down,sess := cn.getDownloader() + client := s3.New(sess) + params := &s3.ListObjectsInput{Bucket: &cn.Bucket_url, Prefix: &otherargs.uniqueid} + client.ListObjectsPages(params, func(page *s3.ListObjectsOutput, more bool) (bool) { + for _, obj := range page.Contents { + key := *obj.Key + if strings.HasPrefix(key, bkpath){ + // Create the directories in the path + + dir, filename := filepath.Split(key) + relfilepath, err := filepath.Rel(otherargs.uniqueid,dir) + + if err != nil { + log.Fatalf("Error in fetching download relative path: %v",err) + } + + dumpdir := filepath.Join(outdir, relfilepath) + err = os.MkdirAll(dumpdir, 0777) + if err != nil { + log.Fatalf("Error in creating backup directory structure: %v",err) + } + + switch filename { + case "locations.txt": + locations = append(locations, path.Join(dumpdir, filename)) + case "contents.txt": + contents = append(contents, path.Join(dumpdir, filename)) + } + + outfilepath := path.Join(dumpdir, filename) + j := downloadJobS3{ conn:down, key:key, outfilepath:outfilepath } + work <- &j + } + } + return true + }) + + close(work) + <- done + log.Println("Total files downloaded using S3 Connector:", filesdownloaded) + updateLocation(locations,outdir) + updateContents(contents) + return err +} diff --git a/bnr-utils/nzsyncbackup/factory/Factory.go b/bnr-utils/nzsyncbackup/factory/Factory.go new file mode 100644 index 0000000..7eaf89d --- /dev/null +++ b/bnr-utils/nzsyncbackup/factory/Factory.go @@ -0,0 +1,17 @@ +package Factory + +import ( + "nzsyncbackup/connector" +) + +func GetConnector(connectorType string, azconnect *Connector.AZConnector, s3connect *Connector.S3connector) Connector.IConnector { + switch connectorType { + case "aws": + return s3connect + case "azure": + return azconnect + default: + return nil + } +} + diff --git a/bnr-utils/nzsyncbackup/main.go b/bnr-utils/nzsyncbackup/main.go new file mode 100644 index 0000000..43dd962 --- /dev/null +++ b/bnr-utils/nzsyncbackup/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "log" + "nzsyncbackup/connector" + "nzsyncbackup/factory" +) + +const( + upload = iota + download +) + + +func main() { + var backupinfo Connector.BackupInfo + var otherargs Connector.OtherArgs + var azconnect Connector.AZConnector + var s3connect Connector.S3connector + var err error +// parse input args + Connector.ParseArgs(&backupinfo, &otherargs, &azconnect, &s3connect) + connector := Factory.GetConnector(otherargs.Connector, &azconnect, &s3connect) + + if ( connector != nil ) { + Connector.SetOperation(&otherargs) + log.Println("OPeration is ", otherargs.Operation) + switch otherargs.Operation { + case upload: + Connector.SetUpLogFile(&backupinfo, &otherargs) + // now do the upload + log.Println("Uploading backup data to cloud for conector : ",otherargs.Connector) + err = connector.Upload(&otherargs, &backupinfo) + if (err != nil) { + log.Fatalln(err) + } + log.Println("Upload successful") + case download: + Connector.SetUpLogFile(&backupinfo, &otherargs) + log.Println("Downloading backup data from cloud for connector : ", otherargs.Connector) + err = connector.Download(&otherargs, &backupinfo) + if (err != nil) { + log.Fatalln(err) + } + log.Println("Download successful") + default: + log.Fatalln("Invalid Operation, Supported Upload/Download") + } + } +}