Model Examples

DataSurface uses a declarative Python DSL to define your entire data ecosystem. Here are examples of how to model common patterns.

Defining a Producer (Ingestion)

This snippet defines a Datastore that pulls data from an external SQL database. It uses Environment Maps to switch between Prod and QA databases automatically.


def create_producer_model(eco: Ecosystem):
    # Get the Team (assuming it exists)
    gz = eco.getZoneOrThrow("USA")
    team = gz.getTeamOrThrow("team1")

    # 1. Define Environment Maps (Prod vs QA Database Connections)
    # This allows the logical reference "customer_db" to resolve to different physical databases
    
    # QA Connection
    team.add(EnvironmentMap(
        keyword="qa",
        dataContainers={
            frozenset(["customer_db"]): PostgresDatabase(
                "CustomerDB_QA",
                hostPort=HostPortPair("pg-data.qa.svc.cluster.local", 5432),
                locations={LocationKey("MyCorp:USA/NY_1")},
                databaseName="customer_db_qa",
                productionStatus=ProductionStatus.NOT_PRODUCTION
            )
        }
    ))

    # Prod Connection
    team.add(EnvironmentMap(
        keyword="prod",
        dataContainers={
            frozenset(["customer_db"]): PostgresDatabase(
                "CustomerDB",
                hostPort=HostPortPair("pg-data.prod.svc.cluster.local", 5432),
                locations={LocationKey("MyCorp:USA/NY_1")},
                databaseName="customer_db",
                productionStatus=ProductionStatus.PRODUCTION
            )
        }
    ))
    
    # 2. Add the Datastore (Producer)
    team.add(
        Datastore(
            "Store1",
            documentation=PlainTextDocumentation("Customer Data Store"),
            # Define how to ingest this data
            capture_metadata=SQLSnapshotIngestion(
                EnvRefDataContainer("customer_db"), # <--- Resolves to Prod or QA DB based on environment
                CronTrigger("Every 5 minute", "*/5 * * * *"),
                IngestionConsistencyType.MULTI_DATASET,
                Credential("postgres", CredentialType.USER_PASSWORD)
            ),
            datasets=[
                Dataset(
                    "customers",
                    schema=DDLTable(
                        columns=[
                            DDLColumn("id", VarChar(20), nullable=NullableStatus.NOT_NULLABLE, primary_key=PrimaryKeyStatus.PK),
                            DDLColumn("firstname", VarChar(100), nullable=NullableStatus.NOT_NULLABLE),
                            DDLColumn("email", VarChar(100))
                        ]
                    )
                )
            ]
        )
    )
            

Defining a Consumer Workspace

Workspaces define consumer contracts. They group required datasets and define service levels (History, Latency).


def create_consumer_model(team: Team):
    # Define a Workspace for a consumer
    team.add(
        Workspace(
            "Consumer1",
            # The platform will manage the container for this workspace
            DataPlatformManagedDataContainer("Consumer1 container"),
            
            # Group 1: Real-time Live Data
            DatasetGroup(
                "LiveDSG",
                sinks=[
                    DatasetSink("Store1", "customers"),
                    DatasetSink("Store1", "addresses")
                ],
                platform_chooser=WorkspacePlatformConfig(
                    hist=ConsumerRetentionRequirements(
                        r=DataMilestoningStrategy.LIVE_ONLY, # Just the latest values
                        latency=DataLatency.MINUTES
                    )
                ),
            ),
            
            # Group 2: Forensic History Data (SCD2)
            DatasetGroup(
                "ForensicDSG",
                sinks=[
                    DatasetSink("Store1", "customers")
                ],
                platform_chooser=WorkspacePlatformConfig(
                    hist=ConsumerRetentionRequirements(
                        r=DataMilestoningStrategy.FORENSIC, # Full history tracking
                        latency=DataLatency.MINUTES
                    )
                )
            )
        )
    )
            

Defining a Data Transformer

Transformers consume data from a Workspace and produce new Datasets. They use Environment Maps to manage code versions and configuration across Prod/QA.


def create_transformer_model(team: Team):
    # 1. Define Environment Maps for Code Versioning & Configuration
    
    # QA Environment: Uses QA code tag and QA keys
    team.add(EnvironmentMap(
        keyword="qa",
        dtReleaseSelectors={
            "custMaskRev": VersionPatternReleaseSelector("v1.0-qa", ReleaseType.STABLE_ONLY)
        },
        configMaps=[
            StaticConfigMap("mask_customer_dt", {"mask_key": "qa_secret_key"})
        ]
    ))

    # Prod Environment: Uses Prod code tag and Prod keys
    team.add(EnvironmentMap(
        keyword="prod",
        dtReleaseSelectors={
            "custMaskRev": VersionPatternReleaseSelector("v1.0-prod", ReleaseType.STABLE_ONLY)
        },
        configMaps=[
            StaticConfigMap("mask_customer_dt", {"mask_key": "prod_secret_key"})
        ]
    ))

    # 2. Define the Transformer Workspace
    team.add(
        Workspace(
            "MaskingWorkspace",
            DataPlatformManagedDataContainer("MaskingContainer"),
            
            # Input Data
            DatasetGroup("InputData", sinks=[DatasetSink("Store1", "customers")]),

            # Transformer Definition
            DataTransformer(
                name="MaskedCustomerGenerator",
                
                # Code Reference (Resolves via EnvMap 'custMaskRev')
                code=PythonRepoCodeArtifact(
                    VersionedRepository(
                        GitHubRepository("myorg/transforms", "main"),
                        EnvRefReleaseSelector("custMaskRev")
                    )
                ),
                
                # Configuration Reference (Resolves via EnvMap 'mask_customer_dt')
                kv=EnvRefConfigMap("mask_customer_dt"),
                
                trigger=CronTrigger("Every 1 minute", "*/1 * * * *"),
                
                # Output Schema
                store=Datastore(
                    name="MaskedCustomers",
                    documentation=PlainTextDocumentation("Anonymized Customer Data"),
                    datasets=[
                        Dataset(
                            "customers",
                            schema=DDLTable(
                                columns=[
                                    DDLColumn("id", VarChar(20), nullable=NullableStatus.NOT_NULLABLE, primary_key=PrimaryKeyStatus.PK),
                                    DDLColumn("hashed_name", VarChar(100), nullable=NullableStatus.NOT_NULLABLE)
                                ]
                            ),
                            classifications=[SimpleDC(SimpleDCTypes.PUB, "Public")]
                        )
                    ]
                )
            )
        )
    )
            

Defining Runtime Environments (RTEs)

RTEs bind logical environments to specific infrastructure and code versions. This snippet shows how to define QA and Prod environments.


def create_runtime_environments(eco: Ecosystem, psp_prod: PlatformServiceProvider, psp_qa: PlatformServiceProvider):
    # 1. Define the Production RTE
    rte_prod = eco.getRuntimeEnvironmentOrThrow("prod")

    # Configure Prod: Use 'prod' tagged models, run on the Production PSP
    rte_prod.configure(
        VersionPatternReleaseSelector(VersionPatterns.VN_N_N + "-prod", ReleaseType.STABLE_ONLY),
        [PSPDeclaration(psp_prod.name, eco.owningRepo)], # Use the Prod PSP
        ProductionStatus.PRODUCTION
    )
    rte_prod.setPSP(psp_prod)

    # 2. Define the QA RTE
    rte_qa = eco.getRuntimeEnvironmentOrThrow("qa")

    # Configure QA: Use 'qa' tagged models, run on the QA PSP
    rte_qa.configure(
        VersionPatternReleaseSelector(VersionPatterns.VN_N_N + "-qa", ReleaseType.STABLE_ONLY),
        [PSPDeclaration(psp_qa.name, eco.owningRepo)], 
        ProductionStatus.NOT_PRODUCTION
    )
    rte_qa.setPSP(psp_qa)