|
2 | 2 | Pandas DataFrame accessor that provides semantic operations using large language models.
|
3 | 3 |
|
4 | 4 | This accessor adds semantic capabilities to pandas DataFrames through the .semantic namespace,
|
5 |
| -enabling LLM-powered operations like mapping, filtering, merging, and aggregation. |
| 5 | +enabling LLM-powered operations like mapping, filtering, merging, aggregation, splitting, gathering, and unnesting. |
6 | 6 |
|
7 | 7 | Basic Usage:
|
8 | 8 | >>> import pandas as pd
|
|
22 | 22 | - Filter Operation: https://ucbepic.github.io/docetl/operators/filter/
|
23 | 23 | - Resolve Operation: https://ucbepic.github.io/docetl/operators/resolve/
|
24 | 24 | - Reduce Operation: https://ucbepic.github.io/docetl/operators/reduce/
|
| 25 | +- Split Operation: https://ucbepic.github.io/docetl/operators/split/ |
| 26 | +- Gather Operation: https://ucbepic.github.io/docetl/operators/gather/ |
| 27 | +- Unnest Operation: https://ucbepic.github.io/docetl/operators/unnest/ |
25 | 28 |
|
26 | 29 | Cost Tracking:
|
27 | 30 | All operations track their LLM usage costs:
|
|
36 | 39 |
|
37 | 40 | from docetl.operations.equijoin import EquijoinOperation
|
38 | 41 | from docetl.operations.filter import FilterOperation
|
| 42 | +from docetl.operations.gather import GatherOperation |
39 | 43 | from docetl.operations.map import MapOperation
|
40 | 44 | from docetl.operations.reduce import ReduceOperation
|
41 | 45 | from docetl.operations.resolve import ResolveOperation
|
| 46 | +from docetl.operations.split import SplitOperation |
| 47 | +from docetl.operations.unnest import UnnestOperation |
42 | 48 | from docetl.optimizer import Optimizer
|
43 | 49 | from docetl.optimizers.join_optimizer import JoinOptimizer
|
44 | 50 | from docetl.runner import DSLRunner
|
|
47 | 53 | class OpHistory(NamedTuple):
|
48 | 54 | """Record of an operation that was run."""
|
49 | 55 |
|
50 |
| - op_type: str # 'map', 'filter', 'merge', 'agg' |
| 56 | + op_type: str # 'map', 'filter', 'merge', 'agg', 'split', 'gather', 'unnest' |
51 | 57 | config: Dict[str, Any] # Full config used
|
52 | 58 | output_columns: List[str] # Columns created/modified
|
53 | 59 |
|
@@ -644,6 +650,234 @@ def filter(
|
644 | 650 |
|
645 | 651 | return self._record_operation(results, "filter", filter_config, cost)
|
646 | 652 |
|
| 653 | + def split( |
| 654 | + self, |
| 655 | + split_key: str, |
| 656 | + method: str, |
| 657 | + method_kwargs: Dict[str, Any], |
| 658 | + **kwargs |
| 659 | + ) -> pd.DataFrame: |
| 660 | + """ |
| 661 | + Split DataFrame rows into multiple chunks based on content. |
| 662 | +
|
| 663 | + Documentation: https://ucbepic.github.io/docetl/operators/split/ |
| 664 | +
|
| 665 | + Args: |
| 666 | + split_key: The column containing content to split |
| 667 | + method: Splitting method, either "token_count" or "delimiter" |
| 668 | + method_kwargs: Dictionary containing method-specific parameters: |
| 669 | + - For "token_count": {"num_tokens": int, "model": str (optional)} |
| 670 | + - For "delimiter": {"delimiter": str, "num_splits_to_group": int (optional)} |
| 671 | + **kwargs: Additional configuration options: |
| 672 | + - model: LLM model to use for tokenization (default: from config) |
| 673 | +
|
| 674 | + Returns: |
| 675 | + pd.DataFrame: DataFrame with split content, including: |
| 676 | + - {split_key}_chunk: The content of each chunk |
| 677 | + - {operation_name}_id: Unique identifier for the original document |
| 678 | + - {operation_name}_chunk_num: Sequential chunk number within the document |
| 679 | +
|
| 680 | + Examples: |
| 681 | + >>> # Split by token count |
| 682 | + >>> df.semantic.split( |
| 683 | + ... split_key="content", |
| 684 | + ... method="token_count", |
| 685 | + ... method_kwargs={"num_tokens": 100} |
| 686 | + ... ) |
| 687 | +
|
| 688 | + >>> # Split by delimiter |
| 689 | + >>> df.semantic.split( |
| 690 | + ... split_key="text", |
| 691 | + ... method="delimiter", |
| 692 | + ... method_kwargs={"delimiter": "\n\n", "num_splits_to_group": 2} |
| 693 | + ... ) |
| 694 | + """ |
| 695 | + # Convert DataFrame to list of dicts |
| 696 | + input_data = self._df.to_dict("records") |
| 697 | + |
| 698 | + # Create split operation config |
| 699 | + split_config = { |
| 700 | + "type": "split", |
| 701 | + "name": f"semantic_split_{len(self._history)}", |
| 702 | + "split_key": split_key, |
| 703 | + "method": method, |
| 704 | + "method_kwargs": method_kwargs, |
| 705 | + **kwargs, |
| 706 | + } |
| 707 | + |
| 708 | + # Create and execute split operation |
| 709 | + split_op = SplitOperation( |
| 710 | + runner=self.runner, |
| 711 | + config=split_config, |
| 712 | + default_model=self.runner.config["default_model"], |
| 713 | + max_threads=self.runner.max_threads, |
| 714 | + console=self.runner.console, |
| 715 | + status=self.runner.status, |
| 716 | + ) |
| 717 | + results, cost = split_op.execute(input_data) |
| 718 | + |
| 719 | + return self._record_operation(results, "split", split_config, cost) |
| 720 | + |
| 721 | + def gather( |
| 722 | + self, |
| 723 | + content_key: str, |
| 724 | + doc_id_key: str, |
| 725 | + order_key: str, |
| 726 | + peripheral_chunks: Optional[Dict[str, Any]] = None, |
| 727 | + **kwargs |
| 728 | + ) -> pd.DataFrame: |
| 729 | + """ |
| 730 | + Gather contextual information from surrounding chunks to enhance each chunk. |
| 731 | +
|
| 732 | + Documentation: https://ucbepic.github.io/docetl/operators/gather/ |
| 733 | +
|
| 734 | + Args: |
| 735 | + content_key: The column containing the main content to be enhanced |
| 736 | + doc_id_key: The column containing document identifiers to group chunks |
| 737 | + order_key: The column containing chunk order numbers within documents |
| 738 | + peripheral_chunks: Configuration for surrounding context: |
| 739 | + - previous: {"head": {"count": int}, "tail": {"count": int}, "middle": {}} |
| 740 | + - next: {"head": {"count": int}, "tail": {"count": int}, "middle": {}} |
| 741 | + **kwargs: Additional configuration options: |
| 742 | + - main_chunk_start: Start marker for main chunk (default: "--- Begin Main Chunk ---") |
| 743 | + - main_chunk_end: End marker for main chunk (default: "--- End Main Chunk ---") |
| 744 | + - doc_header_key: Column containing document headers (optional) |
| 745 | +
|
| 746 | + Returns: |
| 747 | + pd.DataFrame: DataFrame with enhanced content including: |
| 748 | + - {content_key}_rendered: The main content with surrounding context |
| 749 | +
|
| 750 | + Examples: |
| 751 | + >>> # Basic gathering with surrounding context |
| 752 | + >>> df.semantic.gather( |
| 753 | + ... content_key="chunk_content", |
| 754 | + ... doc_id_key="document_id", |
| 755 | + ... order_key="chunk_number", |
| 756 | + ... peripheral_chunks={ |
| 757 | + ... "previous": {"head": {"count": 2}, "tail": {"count": 1}}, |
| 758 | + ... "next": {"head": {"count": 1}, "tail": {"count": 2}} |
| 759 | + ... } |
| 760 | + ... ) |
| 761 | +
|
| 762 | + >>> # Simple gathering without peripheral chunks |
| 763 | + >>> df.semantic.gather( |
| 764 | + ... content_key="content", |
| 765 | + ... doc_id_key="doc_id", |
| 766 | + ... order_key="order" |
| 767 | + ... ) |
| 768 | + """ |
| 769 | + # Convert DataFrame to list of dicts |
| 770 | + input_data = self._df.to_dict("records") |
| 771 | + |
| 772 | + # Create gather operation config |
| 773 | + gather_config = { |
| 774 | + "type": "gather", |
| 775 | + "name": f"semantic_gather_{len(self._history)}", |
| 776 | + "content_key": content_key, |
| 777 | + "doc_id_key": doc_id_key, |
| 778 | + "order_key": order_key, |
| 779 | + **kwargs, |
| 780 | + } |
| 781 | + |
| 782 | + # Add peripheral_chunks config if provided |
| 783 | + if peripheral_chunks is not None: |
| 784 | + gather_config["peripheral_chunks"] = peripheral_chunks |
| 785 | + |
| 786 | + # Create and execute gather operation |
| 787 | + gather_op = GatherOperation( |
| 788 | + runner=self.runner, |
| 789 | + config=gather_config, |
| 790 | + default_model=self.runner.config["default_model"], |
| 791 | + max_threads=self.runner.max_threads, |
| 792 | + console=self.runner.console, |
| 793 | + status=self.runner.status, |
| 794 | + ) |
| 795 | + results, cost = gather_op.execute(input_data) |
| 796 | + |
| 797 | + return self._record_operation(results, "gather", gather_config, cost) |
| 798 | + |
| 799 | + def unnest( |
| 800 | + self, |
| 801 | + unnest_key: str, |
| 802 | + keep_empty: bool = False, |
| 803 | + expand_fields: Optional[List[str]] = None, |
| 804 | + recursive: bool = False, |
| 805 | + depth: Optional[int] = None, |
| 806 | + **kwargs |
| 807 | + ) -> pd.DataFrame: |
| 808 | + """ |
| 809 | + Unnest list-like or dictionary values into multiple rows. |
| 810 | +
|
| 811 | + Documentation: https://ucbepic.github.io/docetl/operators/unnest/ |
| 812 | +
|
| 813 | + Args: |
| 814 | + unnest_key: The column containing list-like or dictionary values to unnest |
| 815 | + keep_empty: Whether to keep rows with empty/null values (default: False) |
| 816 | + expand_fields: For dictionary values, which fields to expand (default: all) |
| 817 | + recursive: Whether to recursively unnest nested structures (default: False) |
| 818 | + depth: Maximum depth for recursive unnesting (default: 1, or unlimited if recursive=True) |
| 819 | + **kwargs: Additional configuration options |
| 820 | +
|
| 821 | + Returns: |
| 822 | + pd.DataFrame: DataFrame with unnested values, where: |
| 823 | + - For lists: Each list element becomes a separate row |
| 824 | + - For dicts: Specified fields are expanded into the parent row |
| 825 | +
|
| 826 | + Examples: |
| 827 | + >>> # Unnest a list column |
| 828 | + >>> df.semantic.unnest( |
| 829 | + ... unnest_key="tags" |
| 830 | + ... ) |
| 831 | + # Input: [{"id": 1, "tags": ["a", "b"]}] |
| 832 | + # Output: [{"id": 1, "tags": "a"}, {"id": 1, "tags": "b"}] |
| 833 | +
|
| 834 | + >>> # Unnest a dictionary column with specific fields |
| 835 | + >>> df.semantic.unnest( |
| 836 | + ... unnest_key="user_info", |
| 837 | + ... expand_fields=["name", "age"] |
| 838 | + ... ) |
| 839 | + # Input: [{"id": 1, "user_info": {"name": "Alice", "age": 30, "email": "[email protected]"}}] |
| 840 | + # Output: [{"id": 1, "user_info": {...}, "name": "Alice", "age": 30}] |
| 841 | +
|
| 842 | + >>> # Recursive unnesting |
| 843 | + >>> df.semantic.unnest( |
| 844 | + ... unnest_key="nested_lists", |
| 845 | + ... recursive=True, |
| 846 | + ... depth=2 |
| 847 | + ... ) |
| 848 | + """ |
| 849 | + # Convert DataFrame to list of dicts |
| 850 | + input_data = self._df.to_dict("records") |
| 851 | + |
| 852 | + # Create unnest operation config |
| 853 | + unnest_config = { |
| 854 | + "type": "unnest", |
| 855 | + "name": f"semantic_unnest_{len(self._history)}", |
| 856 | + "unnest_key": unnest_key, |
| 857 | + "keep_empty": keep_empty, |
| 858 | + "recursive": recursive, |
| 859 | + **kwargs, |
| 860 | + } |
| 861 | + |
| 862 | + # Add optional parameters if provided |
| 863 | + if expand_fields is not None: |
| 864 | + unnest_config["expand_fields"] = expand_fields |
| 865 | + if depth is not None: |
| 866 | + unnest_config["depth"] = depth |
| 867 | + |
| 868 | + # Create and execute unnest operation |
| 869 | + unnest_op = UnnestOperation( |
| 870 | + runner=self.runner, |
| 871 | + config=unnest_config, |
| 872 | + default_model=self.runner.config["default_model"], |
| 873 | + max_threads=self.runner.max_threads, |
| 874 | + console=self.runner.console, |
| 875 | + status=self.runner.status, |
| 876 | + ) |
| 877 | + results, cost = unnest_op.execute(input_data) |
| 878 | + |
| 879 | + return self._record_operation(results, "unnest", unnest_config, cost) |
| 880 | + |
647 | 881 | @property
|
648 | 882 | def total_cost(self) -> float:
|
649 | 883 | """
|
|
0 commit comments