Skip to content

Semantic Layer

A SemanticLayer object in Relta represents the metrics that users can ask questions about per DataSource.

Structure

A metric can be thought of a SQL View on the tables of the DataSource. A definition of a metric contains dimensions, which represent columns of the view, and measures, which represents the aggregate functions that can be applied to the dimensions.

An example of a metric is:

{
  "name": "carrier_route_analysis",
  "description": "This metric provides the count of routes per carrier and identifies the most popular route for each carrier.",
  "datasource": "db",
  "dimensions": [
    {
      "name": "carrier",
      "description": "The code of the carrier operating the flight.",
      "categories": null,
      "dtype": "VARCHAR"
    },
    {
      "name": "carrier_name",
      "description": "The full name of the carrier.",
      "categories": null,
      "dtype": "VARCHAR"
    },
    {
      "name": "origin",
      "description": "The origin airport code for the flight.",
      "categories": null,
      "dtype": "VARCHAR"
    },
    {
      "name": "destination",
      "description": "The destination airport code for the flight.",
      "categories": null,
      "dtype": "VARCHAR"
    }
  ],
  "measures": [
    {
      "name": "route_count",
      "description": "The total number of routes flown by the carrier.",
      "expr": "COUNT(DISTINCT origin || '-' || destination)"
    }
  ],
  "sample_questions": [
    "Give me the full names of the top 5 carriers that flew the most routes and what their most popular route is."
  ],
  "sql_to_underlying_datasource": "SELECT f.carrier, c.name AS carrier_name, f.origin, f.destination FROM public.flights f JOIN public.carriers c ON f.carrier = c.code"
}

The sql_to_underlying_datasource field is the SELECT statement that is in a CREATE VIEW statement and can contain arbitrary DDL.

The SQL Agent is given the SemanticLayer which contains a list of metrics, and the SQL Agent only uses dimensions and measures that are defined in the SemanticLayer.

An example of SQL generated from that metric is:

-- What airline flies the least routes?
SELECT carrier_name,
       COUNT(DISTINCT origin || '-' || destination) as route_count
FROM   common_routes
GROUP BY carrier_name
ORDER BY route_count ASC LIMIT 1;

Observe that the SQL Agent only uses the dimensions and measures that are defined in the metric, and the metric itself is the view that we pull data from. In fact, the SQL Agent does not have access to the original tables or columns, nor even the create view statement to generate a view -- it only has the dimensions and measures for the metric. So the SQL Agent cannot work with any data not declared in the metric.

Usage

A SemanticLayer object is automatically created when a DataSource object is created (either by getting a DataSource from disk or creating a new one). It is accessible from the semantic_layer property of a DataSource.

By default, the SemanticLayer automatically loads in metrics that have been previously defined in it's directory, which is by default .relta/semantic_layer/{datasource_name}.

import relta

rc = relta.Client()
source = rc.get_datasource("flights")
print(source.semantic_layer.metrics) # {name: "most_frequent_airlines_by_airport", ...}

To generate metrics from scratch for a semantic layer, you can use the propose method. We cover how to do this using the CLI in the Getting Started but present it using library code here. Observe that we are only passing in natural language questions, and optionally business context, to the propose method.

import relta

rc = relta.Client()
source = rc.create_datasource(os.environ["PG_AIRPORTS_DSN"], "airports")
source.semantic_layer.propose([
    "Which airport has the longest security line?",
    "Which airports have CLEAR?",
    "Which airport should I go to if I'm flying to LA from NY, if I want to avoid cost?"
])

If you want to edit a metric, you can do it programmatically or by editing the JSON file.

Refining Metrics

If a user gives feedback on a metric, the relevant Response is added to the feedback_responses list of the SemanticLayer, which you can then call refine on to suggest updates to the Semantic Layer:

import relta

rc = relta.Client()
source = rc.get_datasource("airports")

chat = rc.create_chat(source)
resp = chat.prompt("which airport is the biggest?")
resp.feedback("negative", "please add support for airport size and passenger volume")

source.semantic_layer.refine(pr=True)

pr=True will automatically raise a PR with the changes, given your Configuration includes the optional GITHUB_* variables.

API Reference

Source code in src/relta/semantic/semantic_layer.py
class SemanticLayer:
    def __init__(
        self,
        datasource: "DataSource",  # noqa: F821 # type: ignore
        config: Configuration,
        load: bool = True,
        path: Optional[str] = None,
    ):
        from ..datasource import (
            DataSource,
        )  # this is to allow type hinting when writing code w/o a circular import (similar to other libraries)

        self._config = config
        self.datasource: DataSource = datasource
        self.path = (
            self._config.relta_semantic_layer_dir_path / self.datasource.name
            if path is None
            else Path(path)
        )
        self.update_reasoning: str = (
            ""  # semantic agent will write reasoning about the update here
        )
        self.feedback_responses = []
        self.metrics: list[Metric] = []
        self.examples: list[Example] = []
        # self.proposed_changes: list[Metric] = []
        makedirs(self.path, exist_ok=True)
        if load:
            self.load()

    def load(
        self,
        path: Optional[Union[str, Path]] = None,
        json: bool = True,
        metrics_to_load: Optional[list[str]] = None,
    ):
        """Load semantic layer.

        Changes to the metrics are not persisted on disk. Use `.dump()` to persist them.

        Args:
            path (Optional[Union[str, Path]], optional): Path to load the semantic layer. If None, uses `self.path`, which is populated on creation.
            json (bool, optional): Whether to additionally load the semantic layer from deprecated JSON files.
                If a metric exists in both JSON and YAML files by `metric.name`, the JSON metric is ignored. Defaults to True.
            metrics_to_load (Optional[list[str]], optional): List of metric names to load. If None, loads all metrics. Defaults to None.
        """
        logfire.info("loading semantic layer from {path}", path=str(path))
        p = Path(path) if path is not None else self.path
        metrics = []
        yaml_metric_names = set()
        examples = []

        for fpath in p.glob("*.yaml"):
            with open(fpath, "r") as f:
                data = yaml.safe_load(f)
                if fpath.name == "examples.yaml":
                    example_coll = ExampleCollection.model_validate(data)
                    examples.extend(example_coll.examples)
                else:
                    metric = Metric.model_validate(data)
                    if metrics_to_load is None or metric.name in metrics_to_load:
                        metrics.append(metric)
                        yaml_metric_names.add(metric.name)

        if json:
            for fpath in p.glob("*.json"):
                with open(fpath, "r") as f:
                    if fpath.name == "examples.json":
                        example_coll = ExampleCollection.model_validate_json(f.read())
                        examples.extend(example_coll.examples)
                    else:
                        metric = Metric.model_validate_json(f.read())
                        if (
                            metrics_to_load is None or metric.name in metrics_to_load
                        ) and metric.name not in yaml_metric_names:
                            metrics.append(metric)
        self.metrics = metrics
        self.examples = examples

    def dump(
        self,
        clear=True,
        path: Optional[Union[str, Path]] = None,
        # apply_proposals: bool = True,
    ):
        """Dumps the semantic layer, accepting any updates made to the semantic layer.

        Args:
            clear (bool): Delete all JSON/YAML files in the path for this layer. Defaults to True. See `path` attribute for details on the path.
            path (Optional[Union[str, Path]], optional): Path to dump the semantic layer. If None, uses `self.path`, which is populated on creation.
        """

        logfire.info("dumping semantic layer to file")
        p = path if path is not None else self.path

        if clear:
            for yaml_file in p.glob("*.yaml"):
                yaml_file.unlink()
            for json_file in p.glob("*.json"):
                json_file.unlink()

        for metric in self.metrics:
            with open(p / f"{metric.name}.yaml", "w+") as f:
                yaml.dump(metric.model_dump(), f, sort_keys=False)

        examples = ExampleCollection(examples=self.examples)
        with open(p / "examples.yaml", "w+") as f:
            yaml.dump(examples.model_dump(), f, sort_keys=False)

        # additionally, as dumping is "accepting" the changes, we clean up any updated state
        self.update_reasoning = ""

    def dumps(self, mode: Literal["json", "yaml"] = "yaml", **kwargs) -> str:
        """Dumps the metrics and examples to a JSON or YAML string. JSON is typically used for feeding into an agent and YAML for display.

        Args:
            **kwargs: Keyword arguments to pass to `pydantic.BaseModel.model_dump_json` or `yaml.dump` depending on mode.
            Will override, by individual key, `Configuration. default kwargs for printing JSON/YAML.

        Returns:
            str: JSON representation of the semantic layer (metrics and examples).
        """
        default_kwargs = (
            self._config.json_dumps_kwargs
            if mode == "json"
            else self._config.yaml_dumps_kwargs
        )

        default_kwargs.update(kwargs)

        ctr = SemanticLayerContainer(**vars(self))

        return (
            ctr.model_dump_json(**default_kwargs)
            if mode == "json"
            else yaml.dump(ctr.model_dump(), stream=None, **default_kwargs)
        )

    def refine(self, pr=False):
        """Refines the semantic layer based on the feedback and creates a PR with the changes. By default, sets the refined metrics, but does not persist on disk -- see `dump()` to persist.

        If `pr=True`, attempts to create a PR on the configured GitHub repo (see `Configuration`) after setting the updating the metrics in the in-memory semantic layer.
        If it is successful, returns the URL of the PR. Else, returns `None`.
        """
        logfire.info("refine semantic layer")
        llm = ChatOpenAI(model="gpt-4o", temperature=0).with_structured_output(
            RefinedMetrics
        )
        prompt = PromptTemplate.from_template(REFINE_SEMANTIC_LAYER_PROMPT)

        chain = prompt | llm

        feedback = [
            Feedback(
                sentiment=r.feedback_sentiment,
                reason=r.feedback_reason,
                selected_response=r.message,
                message_history=r.chat._get_messages(),
            )
            for r in self.feedback_responses
        ]

        result: RefinedMetrics = chain.invoke(
            {
                "METRIC_MODEL": json.dumps(
                    Metric.model_json_schema(mode="serialization"), indent=2
                ),
                "METRICS": json.dumps(
                    [metric.model_dump() for metric in self.metrics], indent=2
                ),
                "FEEDBACK": json.dumps(
                    [feedback.dict() for feedback in feedback], indent=2
                ),
                "DDL": self.datasource._get_ddl(),
            }
        )

        existing_metrics = {m.name: m for m in self.metrics}
        refined_metrics = {m.original_name: m.updated_metric for m in result.metrics}

        for name, refined_metric in refined_metrics.items():
            if name in existing_metrics:
                existing_metric = existing_metrics[name]
                print(f"Metric: {name}")

                for field in Metric.model_fields:
                    refined_value = getattr(refined_metric, field)
                    existing_value = getattr(existing_metric, field)
                    if refined_value != existing_value:
                        print(f"  {field}:")
                        print(f"    - Old: {existing_value}")
                        print(f"    + New: {refined_value}")

                        # Handle list fields like dimensions, measures, sample_questions
                        if isinstance(refined_value, list):
                            refined_set = set(str(x) for x in refined_value)
                            existing_set = set(str(x) for x in existing_value)

                            removed = existing_set - refined_set
                            added = refined_set - existing_set

                            if removed:
                                print("    Removed items:")
                                for item in removed:
                                    print(f"      - {item}")

                            if added:
                                print("    Added items:")
                                for item in added:
                                    print(f"      + {item}")

                print()
            else:
                print(f"New Metric: {name}")
                print(f"  + {refined_metric.model_dump_json(indent=2)}")
                print()

        # for metric_container in result.metrics:
        #     self.proposed_changes.append(metric_container.updated_metric)
        self.metrics = [m.updated_metric for m in result.metrics]

        if pr:
            # Create a new branch and open a PR with the refined metrics
            res = self._create_pr_with_refined_metrics(
                [update.updated_metric for update in result.metrics]
            )
            if res:
                return res
            else:
                print("Failed to create a PR.")
                return None
        else:
            print("Not creating a PR.")

    def _create_pr_with_refined_metrics(
        self, refined_metrics: list[Metric]
    ) -> Optional[str]:
        """Creates a new branch with refined metrics and opens a PR. Returns URL of PR if successful, else None."""
        try:
            g = Github(self._config.github_token)
            repo = g.get_repo(self._config.github_repo)

            # Create a new branch
            base_branch = repo.get_branch(self._config.github_base_branch)
            branch_name = f"refined-metrics-{uuid.uuid4().hex[:8]}"
            repo.create_git_ref(f"refs/heads/{branch_name}", base_branch.commit.sha)

            # Update metrics files in the new branch
            for metric in refined_metrics:
                file_path = f"{self._config.relta_semantic_layer_dir_path}/{self.datasource.name}/{metric.name}.json"
                content = metric.model_dump_json(indent=2)

                try:
                    file = repo.get_contents(file_path, ref=branch_name)
                    repo.update_file(
                        file_path,
                        f"Update {metric.name} metric",
                        content,
                        file.sha,
                        branch=branch_name,
                    )
                except GithubException:
                    repo.create_file(
                        file_path,
                        f"Add {metric.name} metric",
                        content,
                        branch=branch_name,
                    )

            # Create a pull request
            pr_title = f"Refined metrics for {self.datasource.name}"
            pr_body = "This PR contains refined metrics based on user feedback."
            pr = repo.create_pull(
                title=pr_title,
                body=pr_body,
                head=branch_name,
                base=self._config.github_base_branch,
            )

            print(
                f"Created a pull request with refined metrics: {pr_title}, id: {pr.id}, url: {pr.html_url}"
            )
            return pr.html_url
        except Exception as e:
            print(f"Error creating PR with refined metrics: {str(e)}")
            return None

    def copy(
        self,
        source: "DataSource",  # noqa: F821
        dump: bool = True,
    ):
        """Copy the semantic layer from another DataSource.

        Args:
            source (DataSource): `DataSource` object to copy the semantic layer from.
            # from_path (Optional[Union[str, Path]], optional): Path to load the semantic layer from, ignoring `source`. If None, uses `source`'s semantic layer. Defaults to None.
            dump (bool, optional): Whether to dump the semantic layer to it's path. Defaults to True.
        """
        # if from_path is None:
        self.metrics = [
            metric.model_copy(deep=True) for metric in source.semantic_layer.metrics
        ]
        self.examples = [
            example.model_copy(deep=True) for example in source.semantic_layer.examples
        ]
        # else:
        #     self.load(from_path)

        if dump:
            self.dump()

    def propose(
        self,
        queries: list[str],
        context: Optional[str] = None,
    ):
        """Proposes a new semantic layer for the given datasource and natural language queries.

        Args:
            queries (list[str]): A list of natural language queries that the semantic layer should answer.
            context (Optional[str], optional): Extra information about the datasource. Defaults to None.
        """
        logfire.span("proposing new semantic layer")
        proposed_metrics = self._generate_proposed_metrics(
            [self.datasource._get_ddl()],
            queries,
            self.datasource.name,
            context,
        )
        for m in proposed_metrics.metrics:
            if m.name.lower() == "example":
                m.name = "example_ds"
                logger.info(
                    "Renamed metric 'example' to 'example_ds' to avoid collision with few shot examples."
                )

        self.metrics = proposed_metrics.metrics
        logfire.info(
            "{num_metrics} metrics proposed in semantic layer",
            num_metrics=len(self.metrics),
        )

    def show(self):
        """Prints table of metrics."""
        raise NotImplementedError()

    def _update(self, container: SemanticLayerContainer):
        self.metrics = container.metrics
        self.examples = container.examples
        self.update_reasoning = container.update_reasoning

    @staticmethod
    def _generate_proposed_metrics(
        ddl: list[str],
        questions: list[str],
        source_name: str,
        context: Optional[str] = None,
    ) -> ProposedMetrics:
        """Generates a list of metrics for the given datasource and natural language queries.

        Args:
            ddl (list[str]): The DDL for the datasource.
            questions (list[str]): A list of natural language queries that the semantic layer should answer.
            source_name (str): The name of the datasource.
            context (str, optional): Extra information about the datasource. Defaults to None.
        """
        llm = ChatOpenAI(
            model="gpt-4o-2024-08-06", temperature=0
        ).with_structured_output(ProposedMetrics)
        prompt = PromptTemplate.from_template(SYSTEM_PROMPT_SEMANTIC_LAYER_BUILDER)

        chain = prompt | llm  # | parser
        result: ProposedMetrics = chain.invoke(
            {
                "QUESTIONS": "\n".join(questions),
                "DDL": "\n".join(ddl),
                "CONTEXT": context,
                "DATASOURCE_NAME": source_name,
            }
        )

        return result

copy(source, dump=True)

Copy the semantic layer from another DataSource.

Parameters:

Name Type Description Default
source DataSource

DataSource object to copy the semantic layer from.

required
# from_path (Optional[Union[str, Path]]

Path to load the semantic layer from, ignoring source. If None, uses source's semantic layer. Defaults to None.

required
dump bool

Whether to dump the semantic layer to it's path. Defaults to True.

True
Source code in src/relta/semantic/semantic_layer.py
def copy(
    self,
    source: "DataSource",  # noqa: F821
    dump: bool = True,
):
    """Copy the semantic layer from another DataSource.

    Args:
        source (DataSource): `DataSource` object to copy the semantic layer from.
        # from_path (Optional[Union[str, Path]], optional): Path to load the semantic layer from, ignoring `source`. If None, uses `source`'s semantic layer. Defaults to None.
        dump (bool, optional): Whether to dump the semantic layer to it's path. Defaults to True.
    """
    # if from_path is None:
    self.metrics = [
        metric.model_copy(deep=True) for metric in source.semantic_layer.metrics
    ]
    self.examples = [
        example.model_copy(deep=True) for example in source.semantic_layer.examples
    ]
    # else:
    #     self.load(from_path)

    if dump:
        self.dump()

dump(clear=True, path=None)

Dumps the semantic layer, accepting any updates made to the semantic layer.

Parameters:

Name Type Description Default
clear bool

Delete all JSON/YAML files in the path for this layer. Defaults to True. See path attribute for details on the path.

True
path Optional[Union[str, Path]]

Path to dump the semantic layer. If None, uses self.path, which is populated on creation.

None
Source code in src/relta/semantic/semantic_layer.py
def dump(
    self,
    clear=True,
    path: Optional[Union[str, Path]] = None,
    # apply_proposals: bool = True,
):
    """Dumps the semantic layer, accepting any updates made to the semantic layer.

    Args:
        clear (bool): Delete all JSON/YAML files in the path for this layer. Defaults to True. See `path` attribute for details on the path.
        path (Optional[Union[str, Path]], optional): Path to dump the semantic layer. If None, uses `self.path`, which is populated on creation.
    """

    logfire.info("dumping semantic layer to file")
    p = path if path is not None else self.path

    if clear:
        for yaml_file in p.glob("*.yaml"):
            yaml_file.unlink()
        for json_file in p.glob("*.json"):
            json_file.unlink()

    for metric in self.metrics:
        with open(p / f"{metric.name}.yaml", "w+") as f:
            yaml.dump(metric.model_dump(), f, sort_keys=False)

    examples = ExampleCollection(examples=self.examples)
    with open(p / "examples.yaml", "w+") as f:
        yaml.dump(examples.model_dump(), f, sort_keys=False)

    # additionally, as dumping is "accepting" the changes, we clean up any updated state
    self.update_reasoning = ""

dumps(mode='yaml', **kwargs)

Dumps the metrics and examples to a JSON or YAML string. JSON is typically used for feeding into an agent and YAML for display.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments to pass to pydantic.BaseModel.model_dump_json or yaml.dump depending on mode.

{}

Returns:

Name Type Description
str str

JSON representation of the semantic layer (metrics and examples).

Source code in src/relta/semantic/semantic_layer.py
def dumps(self, mode: Literal["json", "yaml"] = "yaml", **kwargs) -> str:
    """Dumps the metrics and examples to a JSON or YAML string. JSON is typically used for feeding into an agent and YAML for display.

    Args:
        **kwargs: Keyword arguments to pass to `pydantic.BaseModel.model_dump_json` or `yaml.dump` depending on mode.
        Will override, by individual key, `Configuration. default kwargs for printing JSON/YAML.

    Returns:
        str: JSON representation of the semantic layer (metrics and examples).
    """
    default_kwargs = (
        self._config.json_dumps_kwargs
        if mode == "json"
        else self._config.yaml_dumps_kwargs
    )

    default_kwargs.update(kwargs)

    ctr = SemanticLayerContainer(**vars(self))

    return (
        ctr.model_dump_json(**default_kwargs)
        if mode == "json"
        else yaml.dump(ctr.model_dump(), stream=None, **default_kwargs)
    )

load(path=None, json=True, metrics_to_load=None)

Load semantic layer.

Changes to the metrics are not persisted on disk. Use .dump() to persist them.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

Path to load the semantic layer. If None, uses self.path, which is populated on creation.

None
json bool

Whether to additionally load the semantic layer from deprecated JSON files. If a metric exists in both JSON and YAML files by metric.name, the JSON metric is ignored. Defaults to True.

True
metrics_to_load Optional[list[str]]

List of metric names to load. If None, loads all metrics. Defaults to None.

None
Source code in src/relta/semantic/semantic_layer.py
def load(
    self,
    path: Optional[Union[str, Path]] = None,
    json: bool = True,
    metrics_to_load: Optional[list[str]] = None,
):
    """Load semantic layer.

    Changes to the metrics are not persisted on disk. Use `.dump()` to persist them.

    Args:
        path (Optional[Union[str, Path]], optional): Path to load the semantic layer. If None, uses `self.path`, which is populated on creation.
        json (bool, optional): Whether to additionally load the semantic layer from deprecated JSON files.
            If a metric exists in both JSON and YAML files by `metric.name`, the JSON metric is ignored. Defaults to True.
        metrics_to_load (Optional[list[str]], optional): List of metric names to load. If None, loads all metrics. Defaults to None.
    """
    logfire.info("loading semantic layer from {path}", path=str(path))
    p = Path(path) if path is not None else self.path
    metrics = []
    yaml_metric_names = set()
    examples = []

    for fpath in p.glob("*.yaml"):
        with open(fpath, "r") as f:
            data = yaml.safe_load(f)
            if fpath.name == "examples.yaml":
                example_coll = ExampleCollection.model_validate(data)
                examples.extend(example_coll.examples)
            else:
                metric = Metric.model_validate(data)
                if metrics_to_load is None or metric.name in metrics_to_load:
                    metrics.append(metric)
                    yaml_metric_names.add(metric.name)

    if json:
        for fpath in p.glob("*.json"):
            with open(fpath, "r") as f:
                if fpath.name == "examples.json":
                    example_coll = ExampleCollection.model_validate_json(f.read())
                    examples.extend(example_coll.examples)
                else:
                    metric = Metric.model_validate_json(f.read())
                    if (
                        metrics_to_load is None or metric.name in metrics_to_load
                    ) and metric.name not in yaml_metric_names:
                        metrics.append(metric)
    self.metrics = metrics
    self.examples = examples

propose(queries, context=None)

Proposes a new semantic layer for the given datasource and natural language queries.

Parameters:

Name Type Description Default
queries list[str]

A list of natural language queries that the semantic layer should answer.

required
context Optional[str]

Extra information about the datasource. Defaults to None.

None
Source code in src/relta/semantic/semantic_layer.py
def propose(
    self,
    queries: list[str],
    context: Optional[str] = None,
):
    """Proposes a new semantic layer for the given datasource and natural language queries.

    Args:
        queries (list[str]): A list of natural language queries that the semantic layer should answer.
        context (Optional[str], optional): Extra information about the datasource. Defaults to None.
    """
    logfire.span("proposing new semantic layer")
    proposed_metrics = self._generate_proposed_metrics(
        [self.datasource._get_ddl()],
        queries,
        self.datasource.name,
        context,
    )
    for m in proposed_metrics.metrics:
        if m.name.lower() == "example":
            m.name = "example_ds"
            logger.info(
                "Renamed metric 'example' to 'example_ds' to avoid collision with few shot examples."
            )

    self.metrics = proposed_metrics.metrics
    logfire.info(
        "{num_metrics} metrics proposed in semantic layer",
        num_metrics=len(self.metrics),
    )

refine(pr=False)

Refines the semantic layer based on the feedback and creates a PR with the changes. By default, sets the refined metrics, but does not persist on disk -- see dump() to persist.

If pr=True, attempts to create a PR on the configured GitHub repo (see Configuration) after setting the updating the metrics in the in-memory semantic layer. If it is successful, returns the URL of the PR. Else, returns None.

Source code in src/relta/semantic/semantic_layer.py
def refine(self, pr=False):
    """Refines the semantic layer based on the feedback and creates a PR with the changes. By default, sets the refined metrics, but does not persist on disk -- see `dump()` to persist.

    If `pr=True`, attempts to create a PR on the configured GitHub repo (see `Configuration`) after setting the updating the metrics in the in-memory semantic layer.
    If it is successful, returns the URL of the PR. Else, returns `None`.
    """
    logfire.info("refine semantic layer")
    llm = ChatOpenAI(model="gpt-4o", temperature=0).with_structured_output(
        RefinedMetrics
    )
    prompt = PromptTemplate.from_template(REFINE_SEMANTIC_LAYER_PROMPT)

    chain = prompt | llm

    feedback = [
        Feedback(
            sentiment=r.feedback_sentiment,
            reason=r.feedback_reason,
            selected_response=r.message,
            message_history=r.chat._get_messages(),
        )
        for r in self.feedback_responses
    ]

    result: RefinedMetrics = chain.invoke(
        {
            "METRIC_MODEL": json.dumps(
                Metric.model_json_schema(mode="serialization"), indent=2
            ),
            "METRICS": json.dumps(
                [metric.model_dump() for metric in self.metrics], indent=2
            ),
            "FEEDBACK": json.dumps(
                [feedback.dict() for feedback in feedback], indent=2
            ),
            "DDL": self.datasource._get_ddl(),
        }
    )

    existing_metrics = {m.name: m for m in self.metrics}
    refined_metrics = {m.original_name: m.updated_metric for m in result.metrics}

    for name, refined_metric in refined_metrics.items():
        if name in existing_metrics:
            existing_metric = existing_metrics[name]
            print(f"Metric: {name}")

            for field in Metric.model_fields:
                refined_value = getattr(refined_metric, field)
                existing_value = getattr(existing_metric, field)
                if refined_value != existing_value:
                    print(f"  {field}:")
                    print(f"    - Old: {existing_value}")
                    print(f"    + New: {refined_value}")

                    # Handle list fields like dimensions, measures, sample_questions
                    if isinstance(refined_value, list):
                        refined_set = set(str(x) for x in refined_value)
                        existing_set = set(str(x) for x in existing_value)

                        removed = existing_set - refined_set
                        added = refined_set - existing_set

                        if removed:
                            print("    Removed items:")
                            for item in removed:
                                print(f"      - {item}")

                        if added:
                            print("    Added items:")
                            for item in added:
                                print(f"      + {item}")

            print()
        else:
            print(f"New Metric: {name}")
            print(f"  + {refined_metric.model_dump_json(indent=2)}")
            print()

    # for metric_container in result.metrics:
    #     self.proposed_changes.append(metric_container.updated_metric)
    self.metrics = [m.updated_metric for m in result.metrics]

    if pr:
        # Create a new branch and open a PR with the refined metrics
        res = self._create_pr_with_refined_metrics(
            [update.updated_metric for update in result.metrics]
        )
        if res:
            return res
        else:
            print("Failed to create a PR.")
            return None
    else:
        print("Not creating a PR.")

show()

Prints table of metrics.

Source code in src/relta/semantic/semantic_layer.py
def show(self):
    """Prints table of metrics."""
    raise NotImplementedError()

Dimension

Bases: BaseModel

A dimension for a metric.

This is similar to a dimension in LookML -- it can be used to group or filter data in a metric. For example, a dimension could be a column in a table or a calculation based on columns in a table.

Source code in src/relta/semantic/base.py
class Dimension(BaseModel):
    """A dimension for a metric.

    This is similar to a dimension in LookML -- it can be used to group or filter data in a metric.
    For example, a dimension could be a column in a table or a calculation based on columns in a table.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str = Field(
        description="A short name of the dimension. Must be unique within the metric. It can be the same as the column name."
    )

    description: str = Field(
        description="A longer description of the dimension. Keep it within 3 sentences."
    )

    categories: Optional[list] = Field(
        description="The categories of the dimension. This is used to create a list of values for the dimension. This should not be filled out when setting up the semantic layer."
    )

    skip_categorical_load: bool = Field(
        default=False,
        description="Controls whether to load the categorical values for this dimension. Defaults to False. To be used if the data is large text."
    )

    dtype: Optional[str] = Field(description="The data type of the dimension.")

ExampleCollection

Bases: BaseModel

Used for persistence (load/dump) of examples.

Source code in src/relta/semantic/base.py
class ExampleCollection(BaseModel):
    """Used for persistence (load/dump) of examples."""

    examples: list[Example] = Field(description="A collection of examples.")

Measure

Bases: BaseModel

A measure for a metric.

This is similar to a measure in LookML -- it is an aggregate operation on some dimensions. For example, a measure could be the sum of a column or the average of columnA * columnB.

Source code in src/relta/semantic/base.py
class Measure(BaseModel):
    """A measure for a metric.

    This is similar to a measure in LookML -- it is an aggregate operation on some dimensions.
    For example, a measure could be the sum of a column or the average of columnA * columnB.
    """

    name: str = Field(
        description="A short name of the measure. Must be unique within the metric."
    )
    description: str = Field(
        description="A longer description of the measure. Keep it within 3 sentences."
    )
    expr: str = Field(
        description="The SQL expression for this aggregate operation. This must be a valid PostgreSQL expression. Any columns used should also be defined as dimensions."
    )