Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 83 additions & 30 deletions ckanext/dcat/harvesters/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,36 +213,24 @@ def gather_stage(self, harvest_job):

source_dataset = model.Package.get(harvest_job.source.id)

for dataset in parser.datasets():
if not dataset.get('name'):
dataset['name'] = self._gen_new_name(dataset['title'])
if dataset['name'] in self._names_taken:
suffix = len([i for i in self._names_taken if i.startswith(dataset['name'] + '-')]) + 1
dataset['name'] = '{}-{}'.format(dataset['name'], suffix)
self._names_taken.append(dataset['name'])

# Unless already set by the parser, get the owner organization (if any)
# from the harvest source dataset
if not dataset.get('owner_org'):
if source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org

# Try to get a unique identifier for the harvested dataset
guid = self._get_guid(dataset, source_url=source_dataset.url)

if not guid:
self._save_gather_error('Could not get a unique identifier for dataset: {0}'.format(dataset),
harvest_job)
continue

dataset['extras'].append({'key': 'guid', 'value': guid})
guids_in_source.append(guid)

obj = HarvestObject(guid=guid, job=harvest_job,
content=json.dumps(dataset))

obj.save()
object_ids.append(obj.id)
series_ids, series_mapping = self._parse_and_collect(
parser.dataset_series(),
source_dataset,
harvest_job,
guids_in_source,
is_series=True,
collect_series_mapping=True
)
object_ids += series_ids

object_ids += self._parse_and_collect(
parser.datasets(series_mapping),
source_dataset,
harvest_job,
guids_in_source,
is_series=False,
collect_series_mapping=False
)
except Exception as e:
self._save_gather_error('Error when processsing dataset: %r / %s' % (e, traceback.format_exc()),
harvest_job)
Expand Down Expand Up @@ -422,3 +410,68 @@ def import_stage(self, harvest_object):
model.Session.commit()

return True

def _parse_and_collect(
self,
items,
source_dataset,
harvest_job,
guids_in_source,
is_series=False,
collect_series_mapping=False
):
object_ids = []
label = "dataset series" if is_series else "dataset"
series_mapping = {} if collect_series_mapping else None

for item in items:
original_title = item.get("title", label)
if not item.get("name"):
item["name"] = self._gen_new_name(original_title)

if item["name"] in self._names_taken:
suffix = len([i for i in self._names_taken if i.startswith(item["name"] + "-")]) + 1
item["name"] = f"{item['name']}-{suffix}"

self._names_taken.append(item["name"])

if not item.get("owner_org") and source_dataset.owner_org:
item["owner_org"] = source_dataset.owner_org

guid = self._get_guid(item, source_url=source_dataset.url)
if not guid:
self._save_gather_error(f"Could not get a unique identifier for {label}: {item}", harvest_job)
continue

item.setdefault("extras", []).append({"key": "guid", "value": guid})
guids_in_source.append(guid)

obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(item))
obj.save()
object_ids.append(obj.id)

# Store mapping of RDF URI to dataset name if requested
if collect_series_mapping:
series_uri = item.get("uri") or item.get("identifier")
if series_uri:
# Try to find an existing active dataset series by 'guid' match
existing = model.Session.query(model.Package).\
join(model.PackageExtra).\
filter(model.PackageExtra.key == 'guid').\
filter(model.PackageExtra.value == series_uri).\
filter(model.Package.type == 'dataset_series').\
filter(model.Package.state == 'active').\
first()

if existing:
item["name"] = existing.name

series_mapping[str(series_uri)] = {
"id": existing.id if existing else item.get("id"),
"name": item["name"]
}

if collect_series_mapping:
return object_ids, series_mapping

return object_ids
44 changes: 43 additions & 1 deletion ckanext/dcat/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ def _datasets(self):
for dataset in self.g.subjects(RDF.type, DCAT.Dataset):
yield dataset

def _dataset_series(self):
'''
Generator that returns all DCAT dataset series on the graph

Yields rdflib.term.URIRef objects that can be used on graph lookups
and queries
'''
for dataset_series in self.g.subjects(RDF.type, DCAT.DatasetSeries):
yield dataset_series

def next_page(self):
'''
Returns the URL of the next page or None if there is no next page
Expand Down Expand Up @@ -173,7 +183,7 @@ def supported_formats(self):
for plugin
in rdflib.plugin.plugins(kind=rdflib.parser.Parser)])

def datasets(self):
def datasets(self, series_mapping=None):
'''
Generator that returns CKAN datasets parsed from the RDF graph

Expand All @@ -193,6 +203,38 @@ def datasets(self):
)
profile.parse_dataset(dataset_dict, dataset_ref)

# Add in_series if present in RDF and mapped
in_series = []
for series_ref in self.g.objects(dataset_ref, DCAT.inSeries):
key = str(series_ref)
if series_mapping and key in series_mapping:
in_series.append(series_mapping[key]["id"])

if in_series:
dataset_dict["in_series"] = in_series

yield dataset_dict

def dataset_series(self):
'''
Generator that returns CKAN dataset series parsed from the RDF graph

Each dataset series is passed to all the loaded profiles before being
yielded, so it can be further modified by each one of them.

Returns a dataset series dict that can be passed to eg `package_create`
or `package_update`
'''
for dataset_ref in self._dataset_series():
dataset_dict = {}
for profile_class in self._profiles:
profile = profile_class(
self.g,
dataset_type=self.dataset_type,
compatibility_mode=self.compatibility_mode
)
profile.parse_dataset(dataset_dict, dataset_ref)

yield dataset_dict


Expand Down
9 changes: 9 additions & 0 deletions ckanext/dcat/profiles/euro_dcat_ap_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def parse_dataset(self, dataset_dict, dataset_ref):
if values:
dataset_dict["has_version"] = values

# Check if it's a dataset series
if (dataset_ref, RDF.type, DCAT.DatasetSeries) in self.g:
dataset_dict["type"] = "dataset_series"

if "series_order_field" not in dataset_dict:
dataset_dict["series_order_field"] = "metadata_created"
if "series_order_type" not in dataset_dict:
dataset_dict["series_order_type"] = "date"

return dataset_dict

def graph_from_dataset(self, dataset_dict, dataset_ref):
Expand Down