Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 95 additions & 82 deletions bnr-utils/nz_azConnector/nz_azConnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ func updateContents(arrContents []string){
}
}


func main() {
var conn Conn
var backupinfo BackupInfo
Expand All @@ -384,25 +385,37 @@ func main() {
// parse input args
parseArgs(&conn, &backupinfo, &othargs)
flag.Parse()

requiredFlags := []string{ "db","dir","npshost","storage-account","key","container",
"upload","backupset","uniqueid","logfiledir","streams","paralleljobs","blocksize"}

if flag.NFlag() == 0 {
fmt.Println("No arguments passed to nz_azConnector. Below is the list of valid args:")
for _, flagName := range requiredFlags {
if flag.Lookup(flagName).Value.String() == "" {
fmt.Printf("Error: %s Invalid flag\n", flagName)
flag.PrintDefaults()
os.Exit(1)
}
Comment on lines +389 to +397

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to first look at env variables and if not set then error out for required flags

requiredFlags := map[string]string { 
    "db" : "NZ_DATABASE", 
    "dir" : "",
    "npshost" : "NZ_HOST",
    // and so on for the rest.
};

ensureDefault := func(f string, env string) bool {
    val := flag.Lookup(f).Value
    if Value.String() == "" && env != " {
        val.Set(os.Getenv(env));
    }
    return val.String() != "";
}

for f, env := range requiredFlags {
    if !ensureDefault(f, env) {
        fmt.Fprintf(stderr, "-%s is required", f)
        os.Exit(1)
    }
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I will work on this

}

if flag.NFlag() == 0{
fmt.Println("No arguments passed to nz_azConnector. Below is the list of valid args:")
flag.PrintDefaults()
os.Exit(1)
Comment on lines +401 to +403

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should run gofmt

}
// log file configuration setup
logfilename := fmt.Sprintf("nz_azConnector_%d_%s.log", os.Getppid(), time.Now().Format("2006-01-02"))
logfilepath := path.Join(othargs.logfiledir, logfilename)
filehandle, err := os.OpenFile(logfilepath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
fmt.Errorf("Error in opening logfile: %v",err)
}
fmt.Errorf("Error in opening logfile: %v",err)
}


w := io.MultiWriter(os.Stdout, filehandle)
log.SetOutput(w)
prefixStr := fmt.Sprintf("%s ", time.Now().UTC().Format("2006-01-02 15:04:05 EST")) + fmt.Sprintf("%-7s", "[INFO]")
prefixStr := fmt.Sprintf("%s ", time.Now().UTC().Format("2006-01-02 15:04:05 EST")) + fmt.Sprintf("%-7s", "[INFO]")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

log.SetFlags(0)
log.SetPrefix(prefixStr)

dirlist := strings.Split(backupinfo.dirs," ")
log.Println("Azure account name :", conn.azaccount)
log.Println("Azure container :", conn.azcontainer)
Expand All @@ -414,83 +427,83 @@ func main() {
log.Println("BackupsetID :", backupinfo.backupsetID)
log.Println("UniqueID :", othargs.uniqueid)
log.Println("Number of files to upload/download in parallel :", othargs.paralleljobs)

for _, bkpdir := range dirlist {
if (*othargs.upload) {

// now do the upload
log.Println("Uploading backup data to azure cloud from backup dir", bkpdir)
backupdir := filepath.Join(bkpdir, "Netezza", backupinfo.npshost, backupinfo.dbname, backupinfo.backupsetID)
_, err = os.Stat(backupdir)
if err != nil {
handleErrors(fmt.Errorf("Cannot access directory '%s': %v. Please check if DB name, hostname are correct.", backupdir, err))
}

work := make(chan *uploadJob, othargs.paralleljobs)
result := make(chan *jobResult, othargs.paralleljobs)
done := make(chan bool)

go func() {
for {
select {
case j, ok := <- work:
if ! ok {
// done
close(result)
return
}
err := j.upload()
jr := jobResult{ job:&j.job, err:err }
result <- &jr
}
}
}()

filesuploaded := 0
go func() {
for {
select {
case r, ok := <- result:
if ! ok {
// work done
done <- true
return
}
if r.err != nil {
// stopping right here so that we
// don't keep on uploading when one has failed
log.Println("Error while uploading file. Ensure azure storage account name, azure key and container name are correct. If error persists contact IBM support team.", *r.job)
log.Fatalf("Azure storage account:%s accessing container:%s failed with error: %v", r.job.conn.azaccount, r.job.conn.azcontainer, r.err)
}
filesuploaded++ // this is fine, since this is single threaded increment
}
}
if (*othargs.upload) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to have messed up the formatting for the next block here.


// now do the upload
log.Println("Uploading backup data to azure cloud from backup dir", bkpdir)
backupdir := filepath.Join(bkpdir, "Netezza", backupinfo.npshost, backupinfo.dbname, backupinfo.backupsetID)
_, err = os.Stat(backupdir)
if err != nil {
handleErrors(fmt.Errorf("Cannot access directory '%s': %v. Please check if DB name, hostname ar e correct.", backupdir, err))
}

work := make(chan *uploadJob, othargs.paralleljobs)
result := make(chan *jobResult, othargs.paralleljobs)
done := make(chan bool)

go func() {
for {
select {
case j, ok := <- work:
if ! ok {
// done
close(result)
return
}
err := j.upload()
jr := jobResult{ job:&j.job, err:err }
result <- &jr
}
}
}()

err = filepath.Walk(backupdir,
func(absfilepath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
j := uploadJob{ job: job{conn, othargs.uniqueid, bkpdir}, absfilepath: absfilepath }
work <- &j // this will hang until at least one of the prior uploads finish if other.paralleljobs
// are already running
return err
})
close(work)
<- done
if err != nil {
handleErrors(fmt.Errorf("Error reading directory: %s: %v. Please check if DB name, hostname are correct.", backupdir, err))
}
log.Println("Upload successful. Total files uploaded:", filesuploaded)
}

if (*othargs.download) {
log.Println("Downloading backup data from azure cloud to restore dir", bkpdir)
blobpath := filepath.Join(othargs.uniqueid, "Netezza",backupinfo.npshost, backupinfo.dbname, backupinfo.backupsetID)
err = conn.downloadBkp(bkpdir, othargs.uniqueid, blobpath, conn.streams, othargs.paralleljobs)
handleErrors(err)
log.Println("Download successful")
}
}
filesuploaded := 0
go func() {
for {
select {
case r, ok := <- result:
if ! ok {
// work done
done <- true
return
}
if r.err != nil {
// stopping right here so that we
// don't keep on uploading when one has failed
log.Println("Error while uploading file. Ensure azure storage account name, azure k ey and container name are correct. If error persists contact IBM suppor t team.", *r.job)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be copy paste mess up

log.Fatalf("Azure storage account:%s accessing container:%s failed with error: %v", r.job.conn.azaccount, r.job.conn.azcontainer, r.err)
}
filesuploaded++ // this is fine, since this is single threaded increment
}
}
}()

err = filepath.Walk(backupdir,
func(absfilepath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
j := uploadJob{ job: job{conn, othargs.uniqueid, bkpdir}, absfilepath: absfilepath }
work <- &j // this will hang until at least one of the prior uploads finish if other.para lleljobs
// are already running
return err
})
close(work)
<- done
if err != nil {
handleErrors(fmt.Errorf("Error reading directory: %s: %v. Please check if DB name, hostname ar e correct.", backupdir, err))
}
log.Println("Upload successful. Total files uploaded:", filesuploaded)
}

if (*othargs.download) {
log.Println("Downloading backup data from azure cloud to restore dir", bkpdir)
blobpath := filepath.Join(othargs.uniqueid, "Netezza",backupinfo.npshost, backupinfo.dbname, backupinfo.backupsetID)
err = conn.downloadBkp(bkpdir, othargs.uniqueid, blobpath, conn.streams, othargs.paralleljobs)
handleErrors(err)
log.Println("Download successful")
}
}
}