Skip to content

Semantic Layer

A SemanticLayer object in Relta represents the metrics that users can ask questions about per DataSource.

Structure

A metric can be thought of a SQL View on the tables of the DataSource. A definition of a metric contains dimensions, which represent columns of the view, and measures, which represents the aggregate functions that can be applied to the dimensions.

An example of a metric is:

{
  "name": "carrier_route_analysis",
  "description": "This metric provides the count of routes per carrier and identifies the most popular route for each carrier.",
  "datasource": "db",
  "dimensions": [
    {
      "name": "carrier",
      "description": "The code of the carrier operating the flight.",
      "categories": null,
      "dtype": "VARCHAR"
    },
    {
      "name": "carrier_name",
      "description": "The full name of the carrier.",
      "categories": null,
      "dtype": "VARCHAR"
    },
    {
      "name": "origin",
      "description": "The origin airport code for the flight.",
      "categories": null,
      "dtype": "VARCHAR"
    },
    {
      "name": "destination",
      "description": "The destination airport code for the flight.",
      "categories": null,
      "dtype": "VARCHAR"
    }
  ],
  "measures": [
    {
      "name": "route_count",
      "description": "The total number of routes flown by the carrier.",
      "expr": "COUNT(DISTINCT origin || '-' || destination)"
    }
  ],
  "sample_questions": [
    "Give me the full names of the top 5 carriers that flew the most routes and what their most popular route is."
  ],
  "sql_to_underlying_datasource": "SELECT f.carrier, c.name AS carrier_name, f.origin, f.destination FROM public.flights f JOIN public.carriers c ON f.carrier = c.code"
}

The sql_to_underlying_datasource field is the SELECT statement that is in a CREATE VIEW statement and can contain arbitrary DDL.

The SQL Agent is given the SemanticLayer which contains a list of metrics, and the SQL Agent only uses dimensions and measures that are defined in the SemanticLayer.

An example of SQL generated from that metric is:

-- What airline flies the least routes?
SELECT carrier_name,
       COUNT(DISTINCT origin || '-' || destination) as route_count
FROM   common_routes
GROUP BY carrier_name
ORDER BY route_count ASC LIMIT 1;

Observe that the SQL Agent only uses the dimensions and measures that are defined in the metric, and the metric itself is the view that we pull data from. In fact, the SQL Agent does not have access to the original tables or columns, nor even the create view statement to generate a view -- it only has the dimensions and measures for the metric. So the SQL Agent cannot work with any data not declared in the metric.

Usage

A SemanticLayer object is automatically created when a DataSource object is created (either by getting a DataSource from disk or creating a new one). It is accessible from the semantic_layer property of a DataSource.

By default, the SemanticLayer automatically loads in metrics that have been previously defined in it's directory, which is by default .relta/semantic_layer/{datasource_name}.

import relta

rc = relta.Client()
source = rc.get_datasource("flights")
print(source.semantic_layer.metrics) # {name: "most_frequent_airlines_by_airport", ...}

To generate metrics from scratch for a semantic layer, you can use the propose method. We cover how to do this using the CLI in the Getting Started but present it using library code here. Observe that we are only passing in natural language questions, and optionally business context, to the propose method.

import relta

rc = relta.Client()
source = rc.create_datasource(os.environ["PG_AIRPORTS_DSN"], "airports")
source.semantic_layer.propose([
    "Which airport has the longest security line?",
    "Which airports have CLEAR?",
    "Which airport should I go to if I'm flying to LA from NY, if I want to avoid cost?"
])

If you want to edit a metric, you can do it programmatically or by editing the JSON file.

Refining Metrics

If a user gives feedback on a metric, the relevant Response is added to the feedback_responses list of the SemanticLayer, which you can then call refine on to suggest updates to the Semantic Layer:

import relta

rc = relta.Client()
source = rc.get_datasource("airports")

chat = rc.create_chat(source)
resp = chat.prompt("which airport is the biggest?")
resp.feedback("negative", "please add support for airport size and passenger volume")

source.semantic_layer.refine(pr=True)

pr=True will automatically raise a PR with the changes, given your Configuration includes the optional GITHUB_* variables.

API Reference

Source code in src/relta/semantic/semantic_layer.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
class SemanticLayer:
    def __init__(
        self,
        datasource: "DataSource",  # noqa: F821 # type: ignore
        config: Configuration,
        load: bool = True,
        path: Optional[str] = None,
    ):
        from ..datasource import (
            DataSource,
        )  # this is to allow type hinting when writing code w/o a circular import (similar to other libraries)

        self._config = config
        self.datasource: DataSource = datasource
        self.path = (
            self._config.relta_semantic_layer_dir_path / self.datasource.name
            if path is None
            else Path(path)
        )
        self.update_reasoning: str = (
            ""  # semantic agent will write reasoning about the update here
        )
        self.feedback_responses = []
        self.metrics: list[Metric] = []
        self.examples: list[Example] = []
        # self.proposed_changes: list[Metric] = []
        makedirs(self.path, exist_ok=True)
        if load:
            self.load()

    def load(
        self,
        path: Optional[Union[str, Path]] = None,
        json: bool = True,
        metrics_to_load: Optional[list[str]] = None,
    ):
        """Load semantic layer.

        Changes to the metrics are not persisted on disk. Use `.dump()` to persist them.

        Args:
            path (Optional[Union[str, Path]], optional): Path to load the semantic layer. If None, uses `self.path`, which is populated on creation.
            json (bool, optional): Whether to additionally load the semantic layer from deprecated JSON files.
                If a metric exists in both JSON and YAML files by `metric.name`, the JSON metric is ignored. Defaults to True.
            metrics_to_load (Optional[list[str]], optional): List of metric names to load. If None, loads all metrics. Defaults to None.
        """
        logfire.info("loading semantic layer from {path}", path=str(path))
        p = Path(path) if path is not None else self.path
        metrics = []
        yaml_metric_names = set()
        examples = []

        for fpath in p.glob("*.yaml"):
            with open(fpath, "r") as f:
                data = yaml.safe_load(f)
                if fpath.name == "examples.yaml":
                    example_coll = ExampleCollection.model_validate(data)
                    examples.extend(example_coll.examples)
                else:
                    metric = Metric.model_validate(data)
                    if metrics_to_load is None or metric.name in metrics_to_load:
                        metrics.append(metric)
                        yaml_metric_names.add(metric.name)

        if json:
            for fpath in p.glob("*.json"):
                with open(fpath, "r") as f:
                    if fpath.name == "examples.json":
                        example_coll = ExampleCollection.model_validate_json(f.read())
                        examples.extend(example_coll.examples)
                    else:
                        metric = Metric.model_validate_json(f.read())
                        if (
                            metrics_to_load is None or metric.name in metrics_to_load
                        ) and metric.name not in yaml_metric_names:
                            metrics.append(metric)
        self.metrics = metrics
        self.examples = examples

    def dump(
        self,
        clear=True,
        path: Optional[Union[str, Path]] = None,
        # apply_proposals: bool = True,
    ):
        """Dumps the semantic layer, accepting any updates made to the semantic layer.

        Args:
            clear (bool): Delete all JSON/YAML files in the path for this layer. Defaults to True. See `path` attribute for details on the path.
            path (Optional[Union[str, Path]], optional): Path to dump the semantic layer. If None, uses `self.path`, which is populated on creation.
        """

        logfire.info("dumping semantic layer to file")
        p = path if path is not None else self.path

        if clear:
            for yaml_file in p.glob("*.yaml"):
                yaml_file.unlink()
            for json_file in p.glob("*.json"):
                json_file.unlink()

        for metric in self.metrics:
            with open(p / f"{metric.name}.yaml", "w+") as f:
                yaml.dump(metric.model_dump(), f, sort_keys=False)

        examples = ExampleCollection(examples=self.examples)
        with open(p / "examples.yaml", "w+") as f:
            yaml.dump(examples.model_dump(), f, sort_keys=False)

        # additionally, as dumping is "accepting" the changes, we clean up any updated state
        self.update_reasoning = ""

    def dumps(self, mode: Literal["json", "yaml"] = "yaml", **kwargs) -> str:
        """Dumps the metrics and examples to a JSON or YAML string. JSON is typically used for feeding into an agent and YAML for display.

        Args:
            **kwargs: Keyword arguments to pass to `pydantic.BaseModel.model_dump_json` or `yaml.dump` depending on mode.
            Will override, by individual key, `Configuration. default kwargs for printing JSON/YAML.

        Returns:
            str: JSON representation of the semantic layer (metrics and examples).
        """
        default_kwargs = (
            self._config.json_dumps_kwargs
            if mode == "json"
            else self._config.yaml_dumps_kwargs
        )

        default_kwargs.update(kwargs)

        ctr = SemanticLayerContainer(**vars(self))

        return (
            ctr.model_dump_json(**default_kwargs)
            if mode == "json"
            else yaml.dump(ctr.model_dump(), stream=None, **default_kwargs)
        )

    def refine(self, pr=False):
        """Refines the semantic layer based on the feedback and creates a PR with the changes. By default, sets the refined metrics, but does not persist on disk -- see `dump()` to persist.

        If `pr=True`, attempts to create a PR on the configured GitHub repo (see `Configuration`) after setting the updating the metrics in the in-memory semantic layer.
        If it is successful, returns the URL of the PR. Else, returns `None`.
        """
        logfire.info("refine semantic layer")
        llm = ChatOpenAI(model="gpt-4o", temperature=0).with_structured_output(
            RefinedMetrics
        )
        prompt = PromptTemplate.from_template(REFINE_SEMANTIC_LAYER_PROMPT)

        chain = prompt | llm

        feedback = [
            Feedback(
                sentiment=r.feedback_sentiment,
                reason=r.feedback_reason,
                selected_response=r.message,
                message_history=r.chat._get_messages(),
            )
            for r in self.feedback_responses
        ]

        result: RefinedMetrics = chain.invoke(
            {
                "METRIC_MODEL": json.dumps(
                    Metric.model_json_schema(mode="serialization"), indent=2
                ),
                "METRICS": json.dumps(
                    [metric.model_dump() for metric in self.metrics], indent=2
                ),
                "FEEDBACK": json.dumps(
                    [feedback.dict() for feedback in feedback], indent=2
                ),
                "DDL": self.datasource._get_ddl(),
            }
        )

        existing_metrics = {m.name: m for m in self.metrics}
        refined_metrics = {m.original_name: m.updated_metric for m in result.metrics}

        for name, refined_metric in refined_metrics.items():
            if name in existing_metrics:
                existing_metric = existing_metrics[name]
                print(f"Metric: {name}")

                for field in Metric.model_fields:
                    refined_value = getattr(refined_metric, field)
                    existing_value = getattr(existing_metric, field)
                    if refined_value != existing_value:
                        print(f"  {field}:")
                        print(f"    - Old: {existing_value}")
                        print(f"    + New: {refined_value}")

                        # Handle list fields like dimensions, measures, sample_questions
                        if isinstance(refined_value, list):
                            refined_set = set(str(x) for x in refined_value)
                            existing_set = set(str(x) for x in existing_value)

                            removed = existing_set - refined_set
                            added = refined_set - existing_set

                            if removed:
                                print("    Removed items:")
                                for item in removed:
                                    print(f"      - {item}")

                            if added:
                                print("    Added items:")
                                for item in added:
                                    print(f"      + {item}")

                print()
            else:
                print(f"New Metric: {name}")
                print(f"  + {refined_metric.model_dump_json(indent=2)}")
                print()

        # for metric_container in result.metrics:
        #     self.proposed_changes.append(metric_container.updated_metric)
        self.metrics = [m.updated_metric for m in result.metrics]

        if pr:
            # Create a new branch and open a PR with the refined metrics
            res = self._create_pr_with_refined_metrics(
                [update.updated_metric for update in result.metrics]
            )
            if res:
                return res
            else:
                print("Failed to create a PR.")
                return None
        else:
            print("Not creating a PR.")

    def _create_pr_with_refined_metrics(
        self, refined_metrics: list[Metric]
    ) -> Optional[str]:
        """Creates a new branch with refined metrics and opens a PR. Returns URL of PR if successful, else None."""
        try:
            g = Github(self._config.github_token)
            repo = g.get_repo(self._config.github_repo)

            # Create a new branch
            base_branch = repo.get_branch(self._config.github_base_branch)
            branch_name = f"refined-metrics-{uuid.uuid4().hex[:8]}"
            repo.create_git_ref(f"refs/heads/{branch_name}", base_branch.commit.sha)

            # Update metrics files in the new branch
            for metric in refined_metrics:
                file_path = f"{self._config.relta_semantic_layer_dir_path}/{self.datasource.name}/{metric.name}.json"
                content = metric.model_dump_json(indent=2)

                try:
                    file = repo.get_contents(file_path, ref=branch_name)
                    repo.update_file(
                        file_path,
                        f"Update {metric.name} metric",
                        content,
                        file.sha,
                        branch=branch_name,
                    )
                except GithubException:
                    repo.create_file(
                        file_path,
                        f"Add {metric.name} metric",
                        content,
                        branch=branch_name,
                    )

            # Create a pull request
            pr_title = f"Refined metrics for {self.datasource.name}"
            pr_body = "This PR contains refined metrics based on user feedback."
            pr = repo.create_pull(
                title=pr_title,
                body=pr_body,
                head=branch_name,
                base=self._config.github_base_branch,
            )

            print(
                f"Created a pull request with refined metrics: {pr_title}, id: {pr.id}, url: {pr.html_url}"
            )
            return pr.html_url
        except Exception as e:
            print(f"Error creating PR with refined metrics: {str(e)}")
            return None

    def copy(
        self,
        source: "DataSource",  # noqa: F821
        dump: bool = True,
    ):
        """Copy the semantic layer from another DataSource.

        Args:
            source (DataSource): `DataSource` object to copy the semantic layer from.
            # from_path (Optional[Union[str, Path]], optional): Path to load the semantic layer from, ignoring `source`. If None, uses `source`'s semantic layer. Defaults to None.
            dump (bool, optional): Whether to dump the semantic layer to it's path. Defaults to True.
        """
        # if from_path is None:
        self.metrics = [
            metric.model_copy(deep=True) for metric in source.semantic_layer.metrics
        ]
        self.examples = [
            example.model_copy(deep=True) for example in source.semantic_layer.examples
        ]
        # else:
        #     self.load(from_path)

        if dump:
            self.dump()

    def propose(
        self,
        queries: list[str],
        context: Optional[str] = None,
    ):
        """Proposes a new semantic layer for the given datasource and natural language queries.

        Args:
            queries (list[str]): A list of natural language queries that the semantic layer should answer.
            context (Optional[str], optional): Extra information about the datasource. Defaults to None.
        """
        logfire.span("proposing new semantic layer")
        proposed_metrics = self._generate_proposed_metrics(
            [self.datasource._get_ddl()],
            queries,
            self.datasource.name,
            context,
        )
        for m in proposed_metrics.metrics:
            if m.name.lower() == "example":
                m.name = "example_ds"
                logger.info(
                    "Renamed metric 'example' to 'example_ds' to avoid collision with few shot examples."
                )

            # Check the SQL query for errors, repair if necessary
            passed, error = self.datasource._validate_sql_with_explain(m.sql_to_underlying_datasource)
            m.sql_to_underlying_datasource = self._repair_metric_sql(m, error) if not passed else m.sql_to_underlying_datasource

        self.metrics = proposed_metrics.metrics
        logfire.info(
            "{num_metrics} metrics proposed in semantic layer",
            num_metrics=len(self.metrics),
        )

    def show(self):
        """Prints table of metrics."""
        raise NotImplementedError()

    def _update(self, container: SemanticLayerContainer):
        self.metrics = container.metrics
        self.examples = container.examples
        self.update_reasoning = container.update_reasoning

    def _repair_metric_sql(self, metric: Metric, error: str = None, max_attempts: int = 5):
        """Attempts to repair a SQL query for a given metric.

        This method uses a language model to attempt to repair a SQL query that has errors. 
        It tries up to a specified number of attempts to fix the query based on the provided error message.

        Args:
            metric (Metric): The metric object containing the SQL query to be repaired.
            error (str, optional): The error message from the SQL validation. Defaults to None.
            max_attempts (int, optional): The maximum number of attempts to repair the SQL query. Defaults to 5.

        Returns:
            str: The repaired SQL query if successful, otherwise the original SQL query.
        """
        repaired = False
        attempt_count = 0
        llm = ChatOpenAI(model="gpt-4o", temperature=0).with_structured_output(SqlQuery)
        prompt = PromptTemplate.from_template(METRIC_SQL_REPAIR_PROMPT)
        chain = prompt | llm

        while not repaired and attempt_count < max_attempts:
            attempt_count += 1
            result: SqlQuery = chain.invoke(
                {
                    "QUERY": metric.sql_to_underlying_datasource,
                    "ERROR": error,
                    "METRIC_NAME": metric.name,
                    "METRIC_DESCRIPTION": metric.description,
                    "DIMENSIONS": "\n".join([d.name for d in metric.dimensions]),
                    "DDL": self.datasource._get_ddl(),
                }
            )
            repaired, error = self.datasource._validate_sql_with_explain(result.query)
        if repaired:
            logger.info(f"Repaired SQL query for metric {metric.name} after {attempt_count} attempts")
        else:
            logger.error(f"Failed to repair SQL query for metric {metric.name}. Error: {error}")
        return result.query if repaired else metric.sql_to_underlying_datasource


    @staticmethod
    def _generate_proposed_metrics(
        ddl: list[str],
        questions: list[str],
        source_name: str,
        context: Optional[str] = None,
    ) -> ProposedMetrics:
        """Generates a list of metrics for the given datasource and natural language queries.

        Args:
            ddl (list[str]): The DDL for the datasource.
            questions (list[str]): A list of natural language queries that the semantic layer should answer.
            source_name (str): The name of the datasource.
            context (str, optional): Extra information about the datasource. Defaults to None.
        """
        llm = ChatOpenAI(
            model="gpt-4o-2024-08-06", temperature=0
        ).with_structured_output(ProposedMetrics)
        prompt = PromptTemplate.from_template(SYSTEM_PROMPT_SEMANTIC_LAYER_BUILDER)

        chain = prompt | llm  # | parser
        result: ProposedMetrics = chain.invoke(
            {
                "QUESTIONS": "\n".join(questions),
                "DDL": "\n".join(ddl),
                "CONTEXT": context,
                "DATASOURCE_NAME": source_name,
            }
        )

        return result

copy(source, dump=True)

Copy the semantic layer from another DataSource.

Parameters:

Name Type Description Default
source DataSource

DataSource object to copy the semantic layer from.

required
# from_path (Optional[Union[str, Path]]

Path to load the semantic layer from, ignoring source. If None, uses source's semantic layer. Defaults to None.

required
dump bool

Whether to dump the semantic layer to it's path. Defaults to True.

True
Source code in src/relta/semantic/semantic_layer.py
def copy(
    self,
    source: "DataSource",  # noqa: F821
    dump: bool = True,
):
    """Copy the semantic layer from another DataSource.

    Args:
        source (DataSource): `DataSource` object to copy the semantic layer from.
        # from_path (Optional[Union[str, Path]], optional): Path to load the semantic layer from, ignoring `source`. If None, uses `source`'s semantic layer. Defaults to None.
        dump (bool, optional): Whether to dump the semantic layer to it's path. Defaults to True.
    """
    # if from_path is None:
    self.metrics = [
        metric.model_copy(deep=True) for metric in source.semantic_layer.metrics
    ]
    self.examples = [
        example.model_copy(deep=True) for example in source.semantic_layer.examples
    ]
    # else:
    #     self.load(from_path)

    if dump:
        self.dump()

dump(clear=True, path=None)

Dumps the semantic layer, accepting any updates made to the semantic layer.

Parameters:

Name Type Description Default
clear bool

Delete all JSON/YAML files in the path for this layer. Defaults to True. See path attribute for details on the path.

True
path Optional[Union[str, Path]]

Path to dump the semantic layer. If None, uses self.path, which is populated on creation.

None
Source code in src/relta/semantic/semantic_layer.py
def dump(
    self,
    clear=True,
    path: Optional[Union[str, Path]] = None,
    # apply_proposals: bool = True,
):
    """Dumps the semantic layer, accepting any updates made to the semantic layer.

    Args:
        clear (bool): Delete all JSON/YAML files in the path for this layer. Defaults to True. See `path` attribute for details on the path.
        path (Optional[Union[str, Path]], optional): Path to dump the semantic layer. If None, uses `self.path`, which is populated on creation.
    """

    logfire.info("dumping semantic layer to file")
    p = path if path is not None else self.path

    if clear:
        for yaml_file in p.glob("*.yaml"):
            yaml_file.unlink()
        for json_file in p.glob("*.json"):
            json_file.unlink()

    for metric in self.metrics:
        with open(p / f"{metric.name}.yaml", "w+") as f:
            yaml.dump(metric.model_dump(), f, sort_keys=False)

    examples = ExampleCollection(examples=self.examples)
    with open(p / "examples.yaml", "w+") as f:
        yaml.dump(examples.model_dump(), f, sort_keys=False)

    # additionally, as dumping is "accepting" the changes, we clean up any updated state
    self.update_reasoning = ""

dumps(mode='yaml', **kwargs)

Dumps the metrics and examples to a JSON or YAML string. JSON is typically used for feeding into an agent and YAML for display.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments to pass to pydantic.BaseModel.model_dump_json or yaml.dump depending on mode.

{}

Returns:

Name Type Description
str str

JSON representation of the semantic layer (metrics and examples).

Source code in src/relta/semantic/semantic_layer.py
def dumps(self, mode: Literal["json", "yaml"] = "yaml", **kwargs) -> str:
    """Dumps the metrics and examples to a JSON or YAML string. JSON is typically used for feeding into an agent and YAML for display.

    Args:
        **kwargs: Keyword arguments to pass to `pydantic.BaseModel.model_dump_json` or `yaml.dump` depending on mode.
        Will override, by individual key, `Configuration. default kwargs for printing JSON/YAML.

    Returns:
        str: JSON representation of the semantic layer (metrics and examples).
    """
    default_kwargs = (
        self._config.json_dumps_kwargs
        if mode == "json"
        else self._config.yaml_dumps_kwargs
    )

    default_kwargs.update(kwargs)

    ctr = SemanticLayerContainer(**vars(self))

    return (
        ctr.model_dump_json(**default_kwargs)
        if mode == "json"
        else yaml.dump(ctr.model_dump(), stream=None, **default_kwargs)
    )

load(path=None, json=True, metrics_to_load=None)

Load semantic layer.

Changes to the metrics are not persisted on disk. Use .dump() to persist them.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

Path to load the semantic layer. If None, uses self.path, which is populated on creation.

None
json bool

Whether to additionally load the semantic layer from deprecated JSON files. If a metric exists in both JSON and YAML files by metric.name, the JSON metric is ignored. Defaults to True.

True
metrics_to_load Optional[list[str]]

List of metric names to load. If None, loads all metrics. Defaults to None.

None
Source code in src/relta/semantic/semantic_layer.py
def load(
    self,
    path: Optional[Union[str, Path]] = None,
    json: bool = True,
    metrics_to_load: Optional[list[str]] = None,
):
    """Load semantic layer.

    Changes to the metrics are not persisted on disk. Use `.dump()` to persist them.

    Args:
        path (Optional[Union[str, Path]], optional): Path to load the semantic layer. If None, uses `self.path`, which is populated on creation.
        json (bool, optional): Whether to additionally load the semantic layer from deprecated JSON files.
            If a metric exists in both JSON and YAML files by `metric.name`, the JSON metric is ignored. Defaults to True.
        metrics_to_load (Optional[list[str]], optional): List of metric names to load. If None, loads all metrics. Defaults to None.
    """
    logfire.info("loading semantic layer from {path}", path=str(path))
    p = Path(path) if path is not None else self.path
    metrics = []
    yaml_metric_names = set()
    examples = []

    for fpath in p.glob("*.yaml"):
        with open(fpath, "r") as f:
            data = yaml.safe_load(f)
            if fpath.name == "examples.yaml":
                example_coll = ExampleCollection.model_validate(data)
                examples.extend(example_coll.examples)
            else:
                metric = Metric.model_validate(data)
                if metrics_to_load is None or metric.name in metrics_to_load:
                    metrics.append(metric)
                    yaml_metric_names.add(metric.name)

    if json:
        for fpath in p.glob("*.json"):
            with open(fpath, "r") as f:
                if fpath.name == "examples.json":
                    example_coll = ExampleCollection.model_validate_json(f.read())
                    examples.extend(example_coll.examples)
                else:
                    metric = Metric.model_validate_json(f.read())
                    if (
                        metrics_to_load is None or metric.name in metrics_to_load
                    ) and metric.name not in yaml_metric_names:
                        metrics.append(metric)
    self.metrics = metrics
    self.examples = examples

propose(queries, context=None)

Proposes a new semantic layer for the given datasource and natural language queries.

Parameters:

Name Type Description Default
queries list[str]

A list of natural language queries that the semantic layer should answer.

required
context Optional[str]

Extra information about the datasource. Defaults to None.

None
Source code in src/relta/semantic/semantic_layer.py
def propose(
    self,
    queries: list[str],
    context: Optional[str] = None,
):
    """Proposes a new semantic layer for the given datasource and natural language queries.

    Args:
        queries (list[str]): A list of natural language queries that the semantic layer should answer.
        context (Optional[str], optional): Extra information about the datasource. Defaults to None.
    """
    logfire.span("proposing new semantic layer")
    proposed_metrics = self._generate_proposed_metrics(
        [self.datasource._get_ddl()],
        queries,
        self.datasource.name,
        context,
    )
    for m in proposed_metrics.metrics:
        if m.name.lower() == "example":
            m.name = "example_ds"
            logger.info(
                "Renamed metric 'example' to 'example_ds' to avoid collision with few shot examples."
            )

        # Check the SQL query for errors, repair if necessary
        passed, error = self.datasource._validate_sql_with_explain(m.sql_to_underlying_datasource)
        m.sql_to_underlying_datasource = self._repair_metric_sql(m, error) if not passed else m.sql_to_underlying_datasource

    self.metrics = proposed_metrics.metrics
    logfire.info(
        "{num_metrics} metrics proposed in semantic layer",
        num_metrics=len(self.metrics),
    )

refine(pr=False)

Refines the semantic layer based on the feedback and creates a PR with the changes. By default, sets the refined metrics, but does not persist on disk -- see dump() to persist.

If pr=True, attempts to create a PR on the configured GitHub repo (see Configuration) after setting the updating the metrics in the in-memory semantic layer. If it is successful, returns the URL of the PR. Else, returns None.

Source code in src/relta/semantic/semantic_layer.py
def refine(self, pr=False):
    """Refines the semantic layer based on the feedback and creates a PR with the changes. By default, sets the refined metrics, but does not persist on disk -- see `dump()` to persist.

    If `pr=True`, attempts to create a PR on the configured GitHub repo (see `Configuration`) after setting the updating the metrics in the in-memory semantic layer.
    If it is successful, returns the URL of the PR. Else, returns `None`.
    """
    logfire.info("refine semantic layer")
    llm = ChatOpenAI(model="gpt-4o", temperature=0).with_structured_output(
        RefinedMetrics
    )
    prompt = PromptTemplate.from_template(REFINE_SEMANTIC_LAYER_PROMPT)

    chain = prompt | llm

    feedback = [
        Feedback(
            sentiment=r.feedback_sentiment,
            reason=r.feedback_reason,
            selected_response=r.message,
            message_history=r.chat._get_messages(),
        )
        for r in self.feedback_responses
    ]

    result: RefinedMetrics = chain.invoke(
        {
            "METRIC_MODEL": json.dumps(
                Metric.model_json_schema(mode="serialization"), indent=2
            ),
            "METRICS": json.dumps(
                [metric.model_dump() for metric in self.metrics], indent=2
            ),
            "FEEDBACK": json.dumps(
                [feedback.dict() for feedback in feedback], indent=2
            ),
            "DDL": self.datasource._get_ddl(),
        }
    )

    existing_metrics = {m.name: m for m in self.metrics}
    refined_metrics = {m.original_name: m.updated_metric for m in result.metrics}

    for name, refined_metric in refined_metrics.items():
        if name in existing_metrics:
            existing_metric = existing_metrics[name]
            print(f"Metric: {name}")

            for field in Metric.model_fields:
                refined_value = getattr(refined_metric, field)
                existing_value = getattr(existing_metric, field)
                if refined_value != existing_value:
                    print(f"  {field}:")
                    print(f"    - Old: {existing_value}")
                    print(f"    + New: {refined_value}")

                    # Handle list fields like dimensions, measures, sample_questions
                    if isinstance(refined_value, list):
                        refined_set = set(str(x) for x in refined_value)
                        existing_set = set(str(x) for x in existing_value)

                        removed = existing_set - refined_set
                        added = refined_set - existing_set

                        if removed:
                            print("    Removed items:")
                            for item in removed:
                                print(f"      - {item}")

                        if added:
                            print("    Added items:")
                            for item in added:
                                print(f"      + {item}")

            print()
        else:
            print(f"New Metric: {name}")
            print(f"  + {refined_metric.model_dump_json(indent=2)}")
            print()

    # for metric_container in result.metrics:
    #     self.proposed_changes.append(metric_container.updated_metric)
    self.metrics = [m.updated_metric for m in result.metrics]

    if pr:
        # Create a new branch and open a PR with the refined metrics
        res = self._create_pr_with_refined_metrics(
            [update.updated_metric for update in result.metrics]
        )
        if res:
            return res
        else:
            print("Failed to create a PR.")
            return None
    else:
        print("Not creating a PR.")

show()

Prints table of metrics.

Source code in src/relta/semantic/semantic_layer.py
def show(self):
    """Prints table of metrics."""
    raise NotImplementedError()

Dimension

Bases: BaseModel

A dimension for a metric.

This is similar to a dimension in LookML -- it can be used to group or filter data in a metric. For example, a dimension could be a column in a table or a calculation based on columns in a table.

Source code in src/relta/semantic/base.py
class Dimension(BaseModel):
    """A dimension for a metric.

    This is similar to a dimension in LookML -- it can be used to group or filter data in a metric.
    For example, a dimension could be a column in a table or a calculation based on columns in a table.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str = Field(
        description="A short name of the dimension. Must be unique within the metric. It can be the same as the column name."
    )

    description: str = Field(
        description="A longer description of the dimension. Keep it within 3 sentences."
    )

    categories: Optional[list] = Field(
        description="The categories of the dimension. This is used to create a list of values for the dimension. This should not be filled out when setting up the semantic layer."
    )

    skip_categorical_load: bool = Field(
        default=False,
        description="Controls whether to load the categorical values for this dimension. Defaults to False. To be used if the data is large text."
    )

    dtype: Optional[str] = Field(description="The data type of the dimension.")

ExampleCollection

Bases: BaseModel

Used for persistence (load/dump) of examples.

Source code in src/relta/semantic/base.py
class ExampleCollection(BaseModel):
    """Used for persistence (load/dump) of examples."""

    examples: list[Example] = Field(description="A collection of examples.")

Measure

Bases: BaseModel

A measure for a metric.

This is similar to a measure in LookML -- it is an aggregate operation on some dimensions. For example, a measure could be the sum of a column or the average of columnA * columnB.

Source code in src/relta/semantic/base.py
class Measure(BaseModel):
    """A measure for a metric.

    This is similar to a measure in LookML -- it is an aggregate operation on some dimensions.
    For example, a measure could be the sum of a column or the average of columnA * columnB.
    """

    name: str = Field(
        description="A short name of the measure. Must be unique within the metric."
    )
    description: str = Field(
        description="A longer description of the measure. Keep it within 3 sentences."
    )
    expr: str = Field(
        description="The SQL expression for this aggregate operation. This must be a valid PostgreSQL expression. Any columns used should also be defined as dimensions."
    )