-
Notifications
You must be signed in to change notification settings - Fork 6
Deep dive: Import Export
The import/export mechanism hinges around 3 key components:
- An
ImportController
that managesImport
CRD instances - An
ExportController
that managesExport
CRD instances - A REST-based
SubscriptionBoard
service that exposes dynamic routes to PEs
The user scenario below explores, step by step, the role of these components in resolving import/export connections.
For this deep dive, we consider the following user scenario:
- User submits
Job A
with a single exported stream, namedstreams0
- User submits
Job B
that imports the named streamstreams0
$ kubectl apply -f JobA.yaml
Upon addition of the Job A
resource, the job controller inspects the AADLs
generated by the pipeline for imports and exports. In the case of Job A
, the
import set is empty.
Then, the job controller processes the import and export sets. For each export
the controller calls ExportFactory::addExport
to create an Export
CRD instance.
In the case of Job A
, a single Export
instance is created.
Upon addition of the new Export
resource, the export controller calls
ImportExportBroker::add
. For this new Export
, there is no Import
to be matched
so nothing is done.
$ kubectl apply -f JobB.yaml
Upon addition of the Job B
resource, the job controller inspects the AADLs
generated by the pipeline for imports and exports. In the case of Job B
,
the export set is empty.
Then, the job controller processes the import and export sets. For each import
the controller calls ImportFactory::addImport
to create an Import
CRD
instance. In the case of Job B
, a single Import
instance is created.
Upon addition of the new Import
resource, the import controller calls
ImportExportBroker::add
. The method will match the new Import
with all existing
Export
. In this scenario, the Export
created by Job A
is matched.
Since there is a match, the ImportExportBroker
updates the subscription
board with the new route information for the importing PE. Then, it sends a
datagram notification to the target PE.
PEs use a pull
model to update their dynamic routes. This model is implemented
using a periodic polling mechanism that fetches routes for a PE every minute.
Route updates only impact PEs with Import
operators. Job A
has no Import
operator,
so its dynamic routes are not updated.
For Job B
, once the subscription board is updated, the notification sent to
the PE is received. The content of the subscription board for that PE is
then fetched and compared to its local copy.
Finally, the modifications are translated into route ADD
, DEL
and
UPDATE
signals to the PE.
The bootstrapping section for imports and export is located in the job controller.
- Collect input ports that contain imported streams
- Create an
Import
CRD for each of the imported stream
- Collect output ports that contain exported streams
- Create an
Export
CRD for each of the exported stream group
The Import
CRD contains the following attributes:
- Its job ID
- Its PE ID
- Its port ID
- Its port index
- Its
ImportedStreams
object
The Export
CRD contains the following attributes:
- Its PE ID
- Its port ID
- Its
ExportedStreams
object
Import/Export matching is handled by a synchronized Broker
object.
When the Import
controller is notified of the creation of an Import
CRD instance,
Export
are matched through the broker. When the Export
controller is notified of
the creation of an Export
CRD instance, Import
are also matched through the broker.
Matched Import
and Export
and exposed to PEs using a REST service. The
service is updated by the broken each time an Import
or an Export
is
matched (here and here).
PEs use a pull
model to update their dynamic routes. This model is implemented
using a periodic polling mechanism that fetches routes for a PE every minute.
In addition to that, PEs are notified of a match using a datagram notification. This notification is unreliable by design.
PEs use the REST interface to implement filter and subscription changes from an operator (eg. adding properties).
If the instance controller restarts after a failure, the matching is
automatically recomputed using the values from existing Import
and Export
instances.