Skip to content

Commit 72910a0

Browse files
committed
Extract core processing loop to function
- The `defer` calls now clean up immediately - New `Config` and `Row` structs clean up the flow
1 parent 72b2b7f commit 72910a0

File tree

1 file changed

+93
-58
lines changed

1 file changed

+93
-58
lines changed

batch-processing/main.go

Lines changed: 93 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@ import (
2020
"gopkg.in/gographics/imagick.v2/imagick"
2121
)
2222

23+
type Config struct {
24+
AwsRoleArn string
25+
AwsRegion string
26+
S3Bucket string
27+
}
28+
29+
type Row struct {
30+
index int
31+
url string
32+
inputFilepath string
33+
outputFilepath string
34+
outputKey string
35+
outputUrl string
36+
}
37+
2338
func readAndValidateCsv(in io.Reader) ([][]string, error) {
2439
r := csv.NewReader(in)
2540
records, err := r.ReadAll()
@@ -39,6 +54,64 @@ func readAndValidateCsv(in io.Reader) ([][]string, error) {
3954
return records, nil
4055
}
4156

57+
func (row Row) handleRow(svc *s3.S3, config *Config) error {
58+
i, url, inputFilepath, outputFilepath := row.index, row.url, row.inputFilepath, row.outputFilepath
59+
// Create a new file that we will write to
60+
inputFile, err := os.Create(inputFilepath)
61+
if err != nil {
62+
return fmt.Errorf("error: row %d (%q): %v", i, url, err)
63+
}
64+
defer inputFile.Close()
65+
66+
// Get it from the internet!
67+
res, err := http.Get(url)
68+
if err != nil {
69+
return fmt.Errorf("error: row %d (%q): %v", i, url, err)
70+
}
71+
defer res.Body.Close()
72+
73+
// Ensure we got success from the server
74+
if res.StatusCode != http.StatusOK {
75+
return fmt.Errorf("error: download failed: row %d (%q): %s", i, url, res.Status)
76+
}
77+
78+
// Copy the body of the response to the created file
79+
_, err = io.Copy(inputFile, res.Body)
80+
if err != nil {
81+
return fmt.Errorf("error: row %d (%q): %v", i, url, err)
82+
}
83+
84+
// Convert the image to grayscale using imagemagick
85+
// We are directly calling the convert command
86+
_, err = imagick.ConvertImageCommand([]string{
87+
"convert", inputFilepath, "-set", "colorspace", "Gray", outputFilepath,
88+
})
89+
if err != nil {
90+
return fmt.Errorf("error: row %d (%q): %v", i, url, err)
91+
}
92+
93+
log.Printf("processed: row %d (%q) to %q\n", i, url, outputFilepath)
94+
95+
outputFile, err := os.Open(outputFilepath)
96+
if err != nil {
97+
return fmt.Errorf("error: row %d (%q): %v", i, url, err)
98+
}
99+
100+
// Uploads the object to S3. The Context will interrupt the request if the
101+
// timeout expires.
102+
_, err = svc.PutObject(&s3.PutObjectInput{
103+
Bucket: aws.String(config.S3Bucket),
104+
Key: aws.String(row.outputKey),
105+
Body: outputFile,
106+
})
107+
108+
if err != nil {
109+
return fmt.Errorf("error: row %d (%q): %v", i, url, err)
110+
}
111+
112+
return nil
113+
}
114+
42115
func main() {
43116
// We need a file to read from...
44117
inputCsv := flag.String("input", "", "A path to a CSV with a `url` column, containing URLs for images to be processed")
@@ -64,14 +137,20 @@ func main() {
64137
log.Fatalln("Please set S3_BUCKET environment variable")
65138
}
66139

140+
config := &Config{
141+
AwsRoleArn: awsRoleArn,
142+
AwsRegion: awsRegion,
143+
S3Bucket: s3Bucket,
144+
}
145+
67146
// Set up S3 session
68147
// All clients require a Session. The Session provides the client with
69148
// shared configuration such as region, endpoint, and credentials.
70149
sess := session.Must(session.NewSession())
71150

72151
// Create the credentials from AssumeRoleProvider to assume the role
73152
// referenced by the ARN.
74-
creds := stscreds.NewCredentials(sess, awsRoleArn)
153+
creds := stscreds.NewCredentials(sess, config.AwsRoleArn)
75154

76155
// Create service client value configured for credentials
77156
// from assumed role.
@@ -102,72 +181,28 @@ func main() {
102181
prefix := fmt.Sprintf("/tmp/%d-%d", time.Now().UnixMilli(), rand.Int())
103182
inputFilepath := fmt.Sprintf("%s.%s", prefix, "jpg")
104183
outputFilepath := fmt.Sprintf("%s-out.%s", prefix, "jpg")
184+
// Upload just using the final part of the output filepath
185+
outputKey := filepath.Base(outputFilepath)
186+
outputUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", config.S3Bucket, config.AwsRegion, outputKey)
105187

106188
log.Printf("downloading: row %d (%q) to %q\n", i, url, inputFilepath)
107189

108-
// Create a new file that we will write to
109-
inputFile, err := os.Create(inputFilepath)
110-
if err != nil {
111-
log.Printf("error: row %d (%q): %v\n", i, url, err)
112-
continue
113-
}
114-
defer inputFile.Close()
115-
116-
// Get it from the internet!
117-
res, err := http.Get(url)
118-
if err != nil {
119-
log.Printf("error: row %d (%q): %v\n", i, url, err)
120-
continue
121-
}
122-
defer res.Body.Close()
123-
124-
// Ensure we got success from the server
125-
if res.StatusCode != http.StatusOK {
126-
log.Printf("error: download failed: row %d (%q): %s\n", i, url, res.Status)
127-
continue
128-
}
129-
130-
// Copy the body of the response to the created file
131-
_, err = io.Copy(inputFile, res.Body)
132-
if err != nil {
133-
log.Printf("error: row %d (%q): %v\n", i, url, err)
134-
continue
190+
row := Row{
191+
index: i,
192+
url: url,
193+
inputFilepath: inputFilepath,
194+
outputFilepath: outputFilepath,
195+
outputKey: outputKey,
196+
outputUrl: outputUrl,
135197
}
136198

137-
// Convert the image to grayscale using imagemagick
138-
// We are directly calling the convert command
139-
_, err = imagick.ConvertImageCommand([]string{
140-
"convert", inputFilepath, "-set", "colorspace", "Gray", outputFilepath,
141-
})
199+
err := row.handleRow(svc, config)
142200
if err != nil {
143-
log.Printf("error: row %d (%q): %v\n", i, url, err)
201+
log.Printf("error: row %d (%q): %v", i, url, err)
144202
continue
145203
}
146204

147-
log.Printf("processed: row %d (%q) to %q\n", i, url, outputFilepath)
148-
149-
outputFile, err := os.Open(outputFilepath)
150-
if err != nil {
151-
log.Printf("error: row %d (%q): %v\n", i, url, err)
152-
continue
153-
}
154-
155-
// Upload just using the final part of the output filepath
156-
s3Key := filepath.Base(outputFilepath)
157-
158-
// Uploads the object to S3. The Context will interrupt the request if the
159-
// timeout expires.
160-
_, err = svc.PutObject(&s3.PutObjectInput{
161-
Bucket: aws.String(s3Bucket),
162-
Key: aws.String(s3Key),
163-
Body: outputFile,
164-
})
165-
if err != nil {
166-
log.Printf("failed to upload object: %v\n", err)
167-
}
168-
169-
outputUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3Bucket, awsRegion, s3Key)
170-
outputRecords = append(outputRecords, []string{url, inputFilepath, outputFilepath, outputUrl})
205+
outputRecords = append(outputRecords, []string{row.url, row.inputFilepath, row.outputFilepath, row.outputUrl})
171206

172207
log.Printf("uploaded: row %d (%q) to %s\n", i, url, outputUrl)
173208
}

0 commit comments

Comments
 (0)