Interfaces GuideπŸ”—

This page explains how to use interfaces in your project.

OverviewπŸ”—

Interfaces are used to connect to different data sources. They follow the BaseInterface contract, providing get, put, get_many, put_many and monitor methods.

Available InterfacesπŸ”—

EPICS Interfaces (p4p / p4p_server)πŸ”—

The p4p interface connects to an external EPICS server. The p4p_server interface hosts its own p4p server for the specified PVs. Both share the same YAML configuration format. See the README for sample YAML.

k2eg InterfaceπŸ”—

Built on SLAC’s k2eg, this interface gets data from pva and ca protocols over Kafka. See the README for sample YAML.

FastAPI Interface (fastapi_server)πŸ”—

The fastapi_server interface exposes a REST API for submitting inference jobs and retrieving results. It manages an internal job queue and variable store, and embeds a uvicorn server.

Register it in your YAML config with type: "interface.fastapi_server".

Configuration FieldsπŸ”—

Field

Type

Default

Description

name

string

"fastapi_server"

Display name

host

string

"127.0.0.1"

Bind address

port

int

8000

Bind port

start_server

bool

true

Whether to launch embedded uvicorn

wait_for_server_start

bool

false

Block until server is accepting connections

startup_timeout_s

float

2.0

Max wait for startup

input_queue_max

int

1000

Max queued jobs before rejecting (HTTP 429)

output_queue_max

int

1000

Max completed jobs before oldest is evicted

cors_origins

list[string]

[]

CORS allow-origins (empty = no CORS middleware)

variables

dict

required

Variable definitions (see below)

Variable FieldsπŸ”—

Field

Type

Default

Description

mode

string

"inout"

in, out, or inout

type

string

"scalar"

scalar, waveform, array, or image

default

any

0.0 / zeros

Initial value (not supported for image type)

length

int

10

Array/waveform length when no default is provided

image_size

dict

β€”

Required for image type: {"x": int, "y": int}

Example YAMLπŸ”—

modules:
  my_fastapi:
    name: "my_fastapi"
    type: "interface.fastapi_server"
    pub: "in_interface"
    sub:
      - "get_all"
      - "out_transformer"
    config:
      name: "my_fastapi_interface"
      host: "127.0.0.1"
      port: 8000
      start_server: true
      input_queue_max: 1000
      output_queue_max: 1000
      cors_origins:
        - "http://localhost:3000"
      variables:
        MY_INPUT_A:
          mode: in
          type: scalar
          default: 0.0
        MY_INPUT_B:
          mode: in
          type: array
          default: [1, 2, 3, 4, 5]
        MY_OUTPUT:
          mode: out
          type: scalar
          default: 0.0

REST API EndpointsπŸ”—

Method

Path

Description

GET

/health

Health check β€” returns {"status": "ok", "type": "interface.fastapi_server"}

GET

/settings

Variable metadata, queue limits, and route table

POST

/submit

Submit a single inference job

POST

/get

Read current variable values

POST

/jobs

Submit a batch of jobs

GET

/jobs/next

Dequeue the next completed job

GET

/jobs/{job_id}

Get the status of a specific job

Sample curl commandsπŸ”—

curl http://127.0.0.1:8000/health

curl http://127.0.0.1:8000/settings

curl -X POST http://127.0.0.1:8000/submit \
  -H 'Content-Type: application/json' \
  -d '{"job_id":"job-001","variables":{"MY_INPUT_A":{"value":3.14},"MY_INPUT_B":{"value":[10,20,30,40,50]}}}'

curl -X POST http://127.0.0.1:8000/get \
  -H 'Content-Type: application/json' \
  -d '{"variables":["MY_INPUT_A","MY_OUTPUT"]}'

curl -X POST http://127.0.0.1:8000/jobs \
  -H 'Content-Type: application/json' \
  -d '{"jobs":[{"job_id":"job-002","variables":{"MY_INPUT_A":{"value":1.23}}},{"job_id":"job-003","variables":{"MY_INPUT_A":{"value":4.56}}}]}'

curl http://127.0.0.1:8000/jobs/next

curl http://127.0.0.1:8000/jobs/job-001

Error CodesπŸ”—

Code

Condition

403

Write to a read-only variable (mode: out)

404

Unknown variable name, unknown job ID, or no completed jobs for /jobs/next

409

Duplicate job ID

422

Type validation failure (e.g. wrong shape, non-numeric value)

429

Input queue full

Job Lifecycle & TrackingπŸ”—

Jobs submitted via /submit or /jobs follow this lifecycle:

submit β†’ queued β†’ running β†’ completed
  1. Queued β€” the job is validated and placed in the input queue.

  2. Running β€” on each clock tick, one queued job is transitioned to running and its input values are loaded into the variable store for the pipeline to process.

  3. Completed β€” when the pipeline writes results back via put_many, the oldest running job is marked as completed and its outputs are recorded.

Completed jobs can be retrieved via GET /jobs/next (FIFO dequeue) or GET /jobs/{job_id} (by ID).

Note

Current tracking limitation (Stage 1 / v1.7.3+): Job tracking is currently approximated using FIFO ordering. The pipeline’s transformers strip message metadata, so the job_id is typically not propagated through to put_many. Instead, the system uses a FIFO fallback: the oldest running job is assumed to be the one that completed. To enforce this assumption, the clock-driven path transitions only one queued job per tick to running state.

This is reliable for single-job-at-a-time workloads but does not support true concurrent job tracking.

Planned improvement (Stage 2 / v1.8+): Proper job tracking will be integrated via trace propagation across the message broker. Each job’s job_id will be carried through the full pipeline in struct metadata, enabling accurate matching of results to jobs even under concurrent load.