Skip to content

Design Document: Pipeline Domains #2265

@ssanderson

Description

@ssanderson

This issue is a work-in-progress public design document for a series of upcoming changes to Zipline that move us toward the goal of supporting international equities. Specifically, this document outlines the design and motivation for adding international equity support to the Pipeline API via the concept of domains.

Tasks:

Open Questions

  • The design for specialization proposed here complicates the responsibilities of the get_loader function used to determine what loader should be used to service a given LoadableTerm. Previously, most get_loader functions could be constructed using a simple dictionary from BoundColumn -> PipelineLoader. With this design, however, users will need to take care to call specialize ahead of time if they intend to produce a get_loader function for pipelines running on a particular domain.

    Alternatively, users may want to implement a fallback mechanism for looking up specialized columns in terms in a dictionary that may contain the generic version of the associated column (e.g., I might want to register a loader for all EquityPricing queries, and have dispatching behave as expected when a lookup happens for a concrete specialization. Something like @jnazaren's work in ENH: Add extension point for dataset-loader associations #2246 might help simplify the work required here.)

TL;DR

We're adding a new idea of a "domain" to the Pipeline API. The domain of a Pipeline determines the top-level set of assets that the pipeline computes over, as well as the calendar used to align and bucket data observations. Concretely, these correspond to the column and row labels of the 2D arrays passed to CustomFactor.compute (as well as to PipelineLoader.load_adjusted_array, and various other functions internal to the pipeline machinery).

  • In the near term, we intend to use domains to support the creation of pipelines and pipeline expressions that are parametric over a user's choice of a single market. As part of this work, we're adding the ability to expression pipeline computations that are generic, in the sense that they can be re-used without changes on any market for which the user has provided data.
  • In the medium term, we may extend domains to support set-like operations, allowing users to compute over unions of domains (e.g., US + Japan + Germany).
  • In the long term, we might consider generalizing further to support pipeline computations over non-asset domains like companies, countries or sectors.

The name "domain" refers to the mathematical concept of the "domain of a function", which is the set of potential inputs to a function. We've chosen a somewhat abstract name rather than a more domain-specific name (e.g., "Market") to reflect the fact that we may extend the usage of domains in the future to a more general setting.

Background

In Pipeline today, Filters, Factors, and Classifiers (collectively "pipeline expressions") define transformations on two-dimensional blocks of data indexed by (date, asset).

Users can define new pipeline expressions by declaring:

  1. A list of desired inputs.
  2. A window length for each desired input.
  3. A compute function.

When a user runs a Pipeline, their compute function is called every trading day with data corresponding to their requested inputs and window length.

Example:

class VerboseFiveDayVWAP(CustomFactor):
    inputs = [USEquityPricing.close, USEquityPricing.volume]
    window_length = 5

    def compute(self, today, assets, out, closes, volumes):
        print("Today is:", today)
        print("Assets:", assets)
        print("Closes:\n", closes)
        print("Closes Shape\n:", closes.shape)
        print("============================")
        out[:] = np.nanmean(closes * volumes, axis=1)

If I run a Pipeline with this expression from 2017-01-03 to 2017-01-04, it prints (roughly) the following output:

Today is 2017-01-03 00:00:00+00:00
Assets: Int64Index([2, 21, 24, ..., 50569], dtype='int64', length=8343)
Closes:
 [[ 19.78 3.977 116.52 ..., nan]
 [ 19.77 3.75 117.25 ..., nan]
 [ 19.25 3.8 116.74 ..., nan]
 [ 18.71 3.931 116.73 ..., nan]
 [ 18.55 4.1 115.84 ..., 25.05]]
Closes Shape: (5, 8343)

============================

Today is 2017-01-04 00:00:00+00:00
Assets: Int64Index([2, 21, 24, ..., 50569], dtype='int64', length=8337)
Closes:
 [[ 19.77 3.75 117.25 ..., nan]
 [ 19.25 3.8 116.74 ..., nan]
 [ 18.71 3.931 116.73 ..., nan]
 [ 18.55 4.1 115.84 ..., 25.05]
 [ 19.19 4.1 116.14 ..., 25.42]]
Closes Shape: (5, 8337)

Every day, the factor gets passed two (5 x N) arrays, where N is the number of assets trading on the day in question. There are 5 rows because the window_length of VerboseFiveDayVWAP was set to 5.

What Doesn't Work in the Current API?

Implicit US Equity Assumptions

There are a few important features of the above example that implicitly assume we're computing over US Equities:

  1. Our compute function is called at 8:45 US/Eastern at the start of each US Equity trading day.
  2. The rows of our price and volume arrays correspond to the window_length most recent US Equity trading days.
  3. The columns of our price and volume arrays change every day to reflect the creation and destruction of new US Equities.

To support international markets in Pipeline, we need to replace these assumptions with user-specified parameters.

US-Specific Defaults and Names

One of the most useful features of the Pipeline API is the fact that we provide a large number of "built-in Factors", which give users access to efficient, (mostly) documented, and well-tested implementations of common transformations. Unfortunately, almost all of our built-in factors currently only work on US Data. In particular, many of our factors use the USEquityPricing dataset. For example, the definition of our built-in DailyReturns factor is:

from zipline.pipeline.data import USEquityPricing

class Returns(CustomFactor):
    inputs = [USEquityPricing.close]

    def compute(self, today, assets, out, closes):
        out[:] = (closes[-1] - closes[0]) / closes[-1]

This definition is problematic in a multi-market world, because it means that we would have to provide separate factor definitions for each new market (e.g. CanadianReturns, UKReturns, etc.), even though nothing about this computation actually depends on being associated with a particular market.

We should be able to provide a single market-agnostic definition for Returns (and any other factor defined in terms of OHLCV data), and have that definition behave as expected for a pipeline run on a non-US market. In short, we want to support a concept of generic pipeline expressions that don't make any assumptions about the domains of their inputs. It should be possible to compute such generic expressions on any domain (assuming that their inputs can be computed on that domain).

Concrete Use-Cases

Parametrizing Pipelines over Individual Markets

The first use-case we aim to support is allowing users to run pipelines that are parametric over the choice of a single market. This essentially amounts to allowing users to replace each reference to "US Equity" in the outline above with their choice of market.

For example, to support a user running a Pipeline for an Australian market, the above bullets would change to:

  1. Our compute function is called at 9:15 Australia/Sydney at the start of each Australian trading day.
  2. The rows of our price and volume arrays correspond to the window_length most recent Australian trading days.
  3. The columns of our price and volume arrays change every day to reflect the creation and destruction of new Australian Equities.

Parametrizing Pipelines over Multiple Markets

In the future, we may also want to support execution of pipelines that span more than one market. For example, we'd like to allow a user to say "Run this pipeline over all equities trading in the US or in Canada".

Multi-market pipelines are not part our short-term roadmap, because they introduce significant new complexity. (In particular, it's not clear that there's a single correct answer for how to align data from multiple calendars into a single array.) Nevertheless, we want to ensure that the design for domains and single-market pipelines will allow us to support multi-market pipelines if and when we want to tackle that problem.

Parametrizing Pipelines over Non-Asset Domains

In the very long term, an interesting application for domains would be to support pipeline-style computations over collections other than share classes. For example, in an international context, it would be useful to be able to run pipeline-style computations that range over world countries rather than individual equities (with columns like "GDP" or "Median Income" or "Current Account"), and for risk analysis it would be useful to be able to compute over sector- or company-level data rather than security-level data.

API Changes

Goals

Our most immediate need is to provide users with a way to run pipelines "on a particular market". In this context, "running on a market" means three things:

  • The columns of the arrays passed to pipeline expressions should correspond to the assets trading on the market of interest.
  • The rows of the arrays passed to pipeline expressions should correspond to trading days on the market of interest.
  • Running a Pipeline should produce outputs for each trading day on the market of interest.

In the future, these notions might be generalized in at least two important ways:

  1. "Running on a market" can be generalized to "running on multiple markets" by unioning the relevant sets of columns and rows.
  2. "Running on a market" can be generalized to set of column labels other than equities.

Both of these generalizations are out of scope for the current proposal, but we'd like to preserve the possibility of making such changes in the future.

Concrete Changes

In service of the above goals, this document proposes the following changes to the existing Pipeline API:

Domain Objects

  • Add a new zipline module, zipline.pipeline.domain, containing a new Domain class. A domain represents a statically-known (but possibly dynamic-over-time) set of column labels for the arrays passed to CustomFactor.compute (and other pipeline API machinery). A domain has an associated calendar, which determines the row labels for arrays passed to CustomFactor.compute.

Generics, Specialization, and Domain Inference

Some datasets (and, transitively, terms that depend only on those datasets) are naturally "generic" over multiple domains. For example, the current USEquityPricing simply has open, high, low, close, and volume columns, which naturally extend to a daily pricing dataset for any market of interest.

On the other hand, some pipeline expressions really are tied to a specific market. The most common case for this kind of domain-specificity is simply that many datasets are only available for assets trading on a specific market. A more interesting example, howerver is that a factor that implements "beta" in terms of a linear regression against SPY returns only makes sense for the US Equity markets.

This document proposes to support both generic and domain-specific terms as follows:

  • A DataSet may define an explicit domain as a class-level attribute. Doing so indicates that the dataset can only be used in Pipelines running on the dataset's domain. DataSets that do not define a domain will implicitly receive a domain of NotSpecified. Such a dataset is referred to as a "Generic dataset" and can (at least in principle) be used in a Pipeline of any domain.
  • A DataSet's columns automatically inherit the domain of their parent dataset. There is no supported mechanism for constructing a dataset with columns of incompatible domains.
  • A ComputableTerm may define an explicit domain as a class-level attribute, or one may be passed as a runtime parameter. Doing so indicates that the term in question can only be used in Pipeline running on the specified domain. If no such domain is provided, a domain is inferred using the following algorithm:
    • If all input terms have a domain of NotSpecified, the result is also NotSpecified.
    • If there is exactly one non-NotSpecified domain in the input terms, the result is that domain.
    • Otherwise, an error is raised.
      In practice, we expect almost all ComputableTerms to inherit domains transitively from their inputs.
  • A Pipeline can be given an explicit domain by passing one to a new (optional) domain parameter in the constructor of Pipeline. If a domain is not provided, a domain is inferred by applying the same domain inference algorithm outlined above to the outputs and screen of the pipeline. If the domain provided to the Pipeline constructor does not match the domain computed by inference, and neither domain is NotSpecified, an error is also raised.
  • Any generic dataset can be specialized to a concrete domain via a new specialize classmethod. Specializing a generic dataset to a concrete domain creates an identical copy of the generic dataset, but with the domain of the dataset (and, by extension, all of its columns) updated to the concrete domain. For example, the existing USEquityPricing dataset will be replaced with a new, generic, EquityPricing dataset, and USEquityPricing will become an alias of EquityPricing.specialize(USEquities). Supporting specialization in this form allows us to maintain backwards compatibility with most existing usage. It also allows advanced users to register different pipeline loaders for different dataset specializations (e.g., a user with different databases for US pricing and Europe pricing can register different loaders for different specializations of EquityPricing).
  • Any column of a generic dataset can be specialized to a concrete domain. Column specializatino obeys the law that DataSet.column.specialize(domain) is DataSet.specialize(domain).column.

Changes to Pipeline Execution Semantics

Given a pipeline, a start_date, and an end_date, we currently use the following (slightly abridged) algorithm to execute pipelines.

Steps in bold below will require substantial changes to support domains.

Current Algorithm

  1. Build a dependency graph of all terms in the pipeline to be executed.
  2. For each term in the graph, compute the number of extra rows required beyond start_date to service all downstream consumers of the term. For example, if Term B is a 30-day moving average of Term A, then we need to compute 29 extra rows of Term A in order to provide all the necessary inputs to Term B.
    3. Determine the start_date and end_date of each term by resolving the extra rows computed in (2) against a predetermined timeseries of output labels. Today, in practice, this timeseries is always the NYSE calendar.
    4. Compute the lifetimes matrix, a boolean array of shape (dates x assets) indicating which assets were alive on each date that will be in a term's inputs. The date labels of the lifetimes matrics uses the same (implicitly NYSE) labels that are used in (3).
  3. Iterate over the graph in topological order, dispatching LoadableTerms to registered pipeline loaders, and executing ComputableTerms by passing them their inputs.
  4. Extract the pipeline's output columns and stack them into a multi-indexed DataFrame indexed by the (date, asset) pairs where the pipeline's screen is True. The default screen is the lifetimes matrix computed in (4).

Changes

  • Before step (1), compute the domain of the pipeline to be executed using the inference rules outlined in Generics, Specialization, and Domain Inference outlined above.
  • In step (3), use the calendar of the computed domain when resolving extra rows to concrete date ranges.
  • In step (4), use the country code of the computed domain to determine the assets to use as column labels of the lifetimes matrix.
  • In step (5), specialize LoadableTerms to the computed domain before requesting data from PipelineLoaders.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions