Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 113 additions & 201 deletions uk_bin_collection/uk_bin_collection/councils/CotswoldDistrictCouncil.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,240 +4,152 @@

from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support.ui import WebDriverWait

from uk_bin_collection.uk_bin_collection.common import *
from uk_bin_collection.uk_bin_collection.get_bin_data import AbstractGetBinDataClass

_BIN_TYPES = {
"180 litre refuse", "black recycling box", "blue bag", "white bag",
"outdoor food caddy", "indoor food caddy", "garden waste",
"240 litre refuse", "recycling box", "food caddy",
}

class CouncilClass(AbstractGetBinDataClass):
"""
Concrete classes have to implement all abstract operations of the
base class. They can also override some operations with a default
implementation.
"""

class CouncilClass(AbstractGetBinDataClass):
def parse_data(self, page: str, **kwargs) -> dict:
driver = None
try:
page = "https://community.cotswold.gov.uk/s/waste-collection-enquiry"

data = {"bins": []}

house_number = kwargs.get("paon")
postcode = kwargs.get("postcode")
# Use house_number as full address since it contains the complete address
full_address = house_number if house_number else f"{house_number}, {postcode}"
if house_number and postcode and postcode.upper() not in house_number.upper():
full_address = f"{house_number}, {postcode}"
else:
full_address = house_number or postcode or ""
web_driver = kwargs.get("web_driver")
headless = kwargs.get("headless")

# Create Selenium webdriver
user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
driver = create_webdriver(web_driver, headless, user_agent, __name__)
driver.get(page)

# Wait for page to load completely
wait = WebDriverWait(driver, 60)

# Wait for the Salesforce Lightning page to be fully loaded
print("Waiting for Salesforce Lightning components to load...")
time.sleep(10)

# Wait for the address input field to be present
try:
wait.until(EC.presence_of_element_located((By.XPATH, "//label[contains(text(), 'Enter your address')]")))
print("Address label found")
time.sleep(5) # Additional wait for the input field to be ready
except Exception as e:
print(f"Address label not found: {e}")

# Find the address input field using the label
try:
address_entry_field = driver.find_element(By.XPATH, "//label[contains(text(), 'Enter your address')]/following-sibling::*//input")
print("Found address input field using label xpath")
except Exception as e:
print(f"Could not find address input field: {e}")
raise Exception("Could not find address input field")

# Clear any existing text and enter the address
try:
address_entry_field.clear()
address_entry_field.send_keys(str(full_address))
print(f"Entered address: {full_address}")
except Exception as e:
print(f"Error entering address: {e}")
raise

# Click the input field again to trigger the dropdown
try:
address_entry_field.click()
print("Clicked input field to trigger dropdown")
time.sleep(3) # Wait for dropdown to appear
except Exception as e:
print(f"Error clicking input field: {e}")

# Wait for and click the dropdown option
try:
dropdown_wait = WebDriverWait(driver, 10)
dropdown_option = dropdown_wait.until(EC.element_to_be_clickable((By.XPATH, "//li[@role='presentation']")))
dropdown_option.click()
print("Clicked dropdown option")
time.sleep(2)
except Exception as e:
print(f"Error clicking dropdown option: {e}")
raise

# Find and click the Next button
try:
next_wait = WebDriverWait(driver, 10)
next_button = next_wait.until(EC.element_to_be_clickable((By.XPATH, "//button[contains(text(), 'Next')]")))
next_button.click()
print("Clicked Next button")
time.sleep(5) # Wait for the bin collection data to load
except Exception as e:
print(f"Error clicking Next button: {e}")
raise

# Wait for the bin collection data table to load
try:
table_wait = WebDriverWait(driver, 15)
table_wait.until(EC.presence_of_element_located((By.XPATH, "//span[contains(text(), 'Collection Day')]")))
print("Bin collection data table loaded")
time.sleep(3)
except Exception as e:
print(f"Bin collection table not found: {e}")

time.sleep(8)

address_entry_field = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//input[@role='combobox']")
)
)

address_entry_field.click()
time.sleep(1)
address_entry_field.send_keys(str(full_address))
time.sleep(4)

wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//li[@role='presentation']")
)
)
all_opts = driver.find_elements(By.XPATH, "//li[@role='presentation']")
if not all_opts:
raise ValueError("No address options found in dropdown")
# Skip header entries (e.g. "Search" placeholder) at index 0
first_text = all_opts[0].text.strip().lower()
if first_text in ("search", "search...") and len(all_opts) > 1:
all_opts[1].click()
elif len(all_opts) > 1:
all_opts[-1].click()
else:
all_opts[0].click()
Comment on lines +59 to +69
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Single-option branch likely clicks the "Search" header.

According to the PR description, the first li[@ROLE='presentation'] is the "Search" entry, not an address. The else branch here clicks all_opts[0], i.e. exactly that header, so when the user types something that returns no address matches (only the header remains) the form will not advance. Either skip the click entirely in that case or assert at least one real address option is present.

🛠️ Suggested guard
-            all_opts = driver.find_elements(By.XPATH, "//li[`@role`='presentation']")
-            if len(all_opts) > 1:
-                all_opts[-1].click()
-            else:
-                all_opts[0].click()
+            all_opts = driver.find_elements(By.XPATH, "//li[`@role`='presentation']")
+            if len(all_opts) > 1:
+                # First li is the "Search" header; pick the last actual address result.
+                all_opts[-1].click()
+            else:
+                raise ValueError(
+                    "No address results returned for the supplied paon/postcode"
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
all_opts = driver.find_elements(By.XPATH, "//li[@role='presentation']")
if len(all_opts) > 1:
all_opts[-1].click()
else:
all_opts[0].click()
all_opts = driver.find_elements(By.XPATH, "//li[`@role`='presentation']")
if len(all_opts) > 1:
# First li is the "Search" header; pick the last actual address result.
all_opts[-1].click()
else:
raise ValueError(
"No address results returned for the supplied paon/postcode"
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@uk_bin_collection/uk_bin_collection/councils/CotswoldDistrictCouncil.py`
around lines 59 - 63, The code currently clicks all_opts[0] when only one
li[`@role`='presentation'] exists, but that first element is the non-selectable
"Search" header; modify the branch handling for the single-option case in the
method in CotswoldDistrictCouncil.py so it does not click the header: either
skip clicking when len(all_opts) == 1 (no-op) or raise/assert that a real
address option exists before clicking; specifically update the logic around the
all_opts variable and the XPath lookup so only a real address option (e.g.
all_opts[-1] when len>1) is clicked and the single-item case is ignored or fails
fast.

time.sleep(2)

next_button = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[contains(text(), 'Next')]")
)
)
driver.execute_script("arguments[0].click();", next_button)

for _ in range(8):
time.sleep(5)
if driver.find_elements(By.XPATH, "//*[contains(text(), 'Collection day')]"):
break

soup = BeautifulSoup(driver.page_source, features="html.parser")
current_year = datetime.now().year
rows = soup.find_all("tr", class_="slds-hint-parent")

# Try multiple approaches to find bin collection data
rows = []

# Try different table row selectors
table_selectors = [
"tr.slds-hint-parent",
"tr[class*='slds']",
"table tr",
".slds-table tr",
"tbody tr"
]

for selector in table_selectors:
rows = soup.select(selector)
if rows:
break

# If no table rows found, try to find any elements containing collection info
if not rows:
# Look for any elements that might contain bin collection information
collection_elements = soup.find_all(text=re.compile(r'(bin|collection|waste|recycling)', re.I))
if collection_elements:
# Try to extract information from the surrounding elements
for element in collection_elements[:10]: # Limit to first 10 matches
parent = element.parent
if parent:
text = parent.get_text().strip()
if text and len(text) > 10: # Only consider substantial text
# Try to extract date patterns
date_patterns = re.findall(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{1,2}\s+\w+\s+\d{4}\b', text)
if date_patterns:
data["bins"].append({
"type": "General Collection",
"collectionDate": date_patterns[0]
})
break

# Process table rows if found
for row in rows:
try:
columns = row.find_all(["td", "th"])
if len(columns) >= 2:
# Try to identify container type and date
container_type = "Unknown"
collection_date = ""

# Look for header cell (th) for container type
th_element = row.find("th")
if th_element:
container_type = th_element.get_text().strip()
elif columns:
# If no th, use first column as type
container_type = columns[0].get_text().strip()

# Look for date in subsequent columns
for col in columns[1:] if th_element else columns[1:]:
col_text = col.get_text().strip()
if col_text:
if col_text.lower() == "today":
collection_date = datetime.now().strftime("%d/%m/%Y")
break
elif col_text.lower() == "tomorrow":
collection_date = (datetime.now() + timedelta(days=1)).strftime("%d/%m/%Y")
break
else:
# Try to parse various date formats
try:
# Clean the text
clean_text = re.sub(r"[^a-zA-Z0-9,\s/-]", "", col_text).strip()

# Try different date parsing approaches
date_formats = [
"%a, %d %B",
"%d %B %Y",
"%d/%m/%Y",
"%d-%m-%Y",
"%B %d, %Y"
]

for fmt in date_formats:
try:
parsed_date = datetime.strptime(clean_text, fmt)
if fmt == "%a, %d %B": # Add year if missing
if parsed_date.replace(year=current_year) < datetime.now():
parsed_date = parsed_date.replace(year=current_year + 1)
else:
parsed_date = parsed_date.replace(year=current_year)
collection_date = parsed_date.strftime("%d/%m/%Y")
break
except ValueError:
continue

if collection_date:
break
except Exception:
continue

# Add to data if we have both type and date
if container_type and collection_date and container_type.lower() != "unknown":
if rows:
for row in rows:
try:
th = row.find("th")
td = row.find("td")
if not th or not td:
continue
container_type = (
th.get("data-cell-value", "").strip()
or th.get_text(strip=True)
)
raw_date = (
td.get("data-cell-value", "").strip()
or td.get_text(strip=True)
)
if container_type and raw_date:
try:
parsed_date = self._parse_date(raw_date, current_year)
except (ValueError, AttributeError):
continue
data["bins"].append({
"type": container_type,
"collectionDate": collection_date
"collectionDate": parsed_date,
})
except (ValueError, AttributeError):
continue
else:
body_text = driver.find_element(By.TAG_NAME, "body").text
lines = [l.strip() for l in body_text.split("\n") if l.strip()]
for i, line in enumerate(lines):
if line.lower() in _BIN_TYPES and i + 1 < len(lines):
raw_date = lines[i + 1]
if self._looks_like_date(raw_date):
try:
parsed_date = self._parse_date(raw_date, current_year)
except (ValueError, AttributeError):
continue
data["bins"].append({
"type": line,
"collectionDate": parsed_date,
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except Exception as e:
print(f"Error processing row: {e}")
continue

# If no data found, add a debug entry
if not data["bins"]:
print("No bin collection data found. Page source:")
print(driver.page_source[:1000]) # Print first 1000 chars for debugging

except Exception as e:
# Here you can log the exception if needed
print(f"An error occurred: {e}")
print(f"Full address used: {full_address}")
print(f"Page URL: {page}")
# Add some debug information
if driver:
print(f"Current page title: {driver.title}")
print(f"Current URL: {driver.current_url}")
# Optionally, re-raise the exception if you want it to propagate
raise
finally:
# This block ensures that the driver is closed regardless of an exception
if driver:
driver.quit()
return data
return data

@staticmethod
def _looks_like_date(text):
t = text.lower().strip()
return t in ("today", "tomorrow") or bool(re.match(r"^(mon|tue|wed|thu|fri|sat|sun)", t))

@staticmethod
def _parse_date(raw_date, current_year):
t = raw_date.lower().strip()
if t == "today":
return datetime.now().strftime(date_format)
if t == "tomorrow":
return (datetime.now() + timedelta(days=1)).strftime(date_format)
cleaned = re.sub(r"[^\w\s,]", "", raw_date)
parsed = datetime.strptime(cleaned, "%a, %d %B")
parsed = parsed.replace(year=current_year)
if parsed.date() < datetime.now().date():
parsed = parsed.replace(year=current_year + 1)
return parsed.strftime(date_format)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Loading