Skip to content

Commit 068c059

Browse files
authored
Update google_cloud.py
1 parent 408da39 commit 068c059

File tree

1 file changed

+75
-13
lines changed

1 file changed

+75
-13
lines changed

integrations/google_cloud.py

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,77 @@
1+
import logging
2+
import pandas as pd
13
from google.cloud import storage
4+
from io import StringIO
25

3-
def upload_data_to_gcs(data, bucket_name, file_name):
4-
"""
5-
Upload data to Google Cloud Storage.
6-
7-
:param data: Pandas DataFrame with data
8-
:param bucket_name: GCS bucket name
9-
:param file_name: File name
10-
:return: None
11-
"""
12-
client = storage.Client()
13-
bucket = client.get_bucket(bucket_name)
14-
blob = bucket.blob(file_name)
15-
blob.upload_from_string(data.to_csv(index=False))
6+
# Configure logging
7+
logging.basicConfig(level=logging.INFO)
8+
logger = logging.getLogger(__name__)
9+
10+
class GCSUploader:
11+
"""A class to handle uploading data to Google Cloud Storage."""
12+
13+
def __init__(self, bucket_name: str):
14+
"""Initialize the GCSUploader with the specified bucket name.
15+
16+
Args:
17+
bucket_name (str): The name of the GCS bucket.
18+
"""
19+
self.bucket_name = bucket_name
20+
self.client = storage.Client()
21+
22+
def upload_data(self, data: pd.DataFrame, file_name: str, file_format: str = 'csv'):
23+
"""
24+
Upload data to Google Cloud Storage.
25+
26+
Args:
27+
data (pd.DataFrame): Pandas DataFrame with data.
28+
file_name (str): The name of the file to be saved in GCS.
29+
file_format (str): The format of the file ('csv' or 'json'). Default is 'csv'.
30+
"""
31+
try:
32+
if file_format == 'csv':
33+
self.upload_csv(data, file_name)
34+
elif file_format == 'json':
35+
self.upload_json(data, file_name)
36+
else:
37+
raise ValueError("Unsupported file format. Use 'csv' or 'json'.")
38+
except Exception as e:
39+
logger.error(f"Failed to upload data to GCS: {e}")
40+
raise
41+
42+
def upload_csv(self, data: pd.DataFrame, file_name: str):
43+
"""Upload DataFrame as a CSV file to GCS."""
44+
csv_buffer = StringIO()
45+
data.to_csv(csv_buffer, index=False)
46+
self._upload_to_gcs(csv_buffer.getvalue(), file_name, 'text/csv')
47+
48+
def upload_json(self, data: pd.DataFrame, file_name: str):
49+
"""Upload DataFrame as a JSON file to GCS."""
50+
json_buffer = StringIO()
51+
data.to_json(json_buffer, orient='records', lines=True)
52+
self._upload_to_gcs(json_buffer.getvalue(), file_name, 'application/json')
53+
54+
def _upload_to_gcs(self, data: str, file_name: str, content_type: str):
55+
"""Helper method to upload data to GCS."""
56+
try:
57+
bucket = self.client.bucket(self.bucket_name)
58+
blob = bucket.blob(file_name)
59+
blob.upload_from_string(data, content_type=content_type)
60+
logger.info(f"Successfully uploaded {file_name} to {self.bucket_name}.")
61+
except Exception as e:
62+
logger.error(f"An error occurred while uploading to GCS: {e}")
63+
raise
64+
65+
# Example usage
66+
if __name__ == "__main__":
67+
# Sample DataFrame
68+
df = pd.DataFrame({
69+
'Column1': [1, 2, 3],
70+
'Column2': ['A', 'B', 'C']
71+
})
72+
73+
bucket_name = 'your-bucket-name'
74+
file_name = 'data/sample_data.csv'
75+
76+
uploader = GCSUploader(bucket_name)
77+
uploader.upload_data(df, file_name, file_format='csv')

0 commit comments

Comments
 (0)