Skip to content

Query

Defines a search query against an object storage container (e.g., AWS S3 bucket) index.

Queries can execute periodically (e.g., k8s CronJob) or ad-demand via the REST API to populate log analytics dashboards, alerts, and ad-hoc queries.

Each query specifies criteria and actions.

Criteria select events to fetch and transform into TenXObjects:

  • Target: target app/service whose events to query (e.g., 'myApp').
  • Time range: of log/trace events to fetch.
  • Terms: query to execute against the underling storage
  • Filters: list of JavaScript expressions which allows for more complex filtering of which TenXObjects matching the search terms to select.

Actions specify how to further filter, enrich, aggregate, and stream TenXObjects to log analyzers and metric outputs (e.g., AWS CloudWatch, Prometheus).

Architecture

A distributed Stream Processing Architecture executes queries via parallel scan and stream workers to select, transform and stream TenXObjects to target output(s).

Scan

Scan workers process TenXTemplate Filters to identify blobs matching a target prefix (i.e., app/service name), time frame (e.g., last 10min) and search terms (e.g., ERROR).

Multiple scan workers perform this workload in parallel, submitting matching byte ranges to stream workers via SQS.

Stream

Stream workers read object storage blob byte ranges identified by the scan workers and transform them into TenXObjects on which they perform the actions specified by the query.

Architecture Flow

The following diagram illustrates the complete distributed, parallel architecture for query execution:

graph TD
    Client["🔍 Query Request<br/>API /search"] --> S3["☁️ S3 Object Storage<br/>Log Files + Bloom Filters"]

    S3 --> ScanLayer

    subgraph ScanLayer["⚡ Parallel Scan Functions<br/> "]
        Scan1["Scan #1<br/>Time: 00:00-05:00<br/>• Filter prefix<br/>• Test Bloom filters<br/>• Find byte ranges"]
        Scan2["Scan #2<br/>Time: 05:00-10:00<br/>• Filter prefix<br/>• Test Bloom filters<br/>• Find byte ranges"]
        Scan3["Scan #3<br/>Time: 10:00-15:00<br/>• Filter prefix<br/>• Test Bloom filters<br/>• Find byte ranges"]
        ScanN["Scan #N<br/>Time: N+5:00<br/>• Filter prefix<br/>• Test Bloom filters<br/>• Find byte ranges"]
    end

    ScanLayer --> StreamLayer

    subgraph StreamLayer["🌊 Parallel Stream Functions<br/> "]
        Stream1["Stream #1<br/>• Fetch ranges<br/>• Parse logs<br/>• Create TenXObjects<br/>• Filter & enrich"]
        Stream2["Stream #2<br/>• Fetch ranges<br/>• Parse logs<br/>• Create TenXObjects<br/>• Filter & enrich"]
        Stream3["Stream #3<br/>• Fetch ranges<br/>• Parse logs<br/>• Create TenXObjects<br/>• Filter & enrich"]
        StreamM["Stream #M<br/>• Fetch ranges<br/>• Parse logs<br/>• Create TenXObjects<br/>• Filter & enrich"]
    end

    StreamLayer --> OutputLayer

    subgraph OutputLayer["📊 Output Targets<br/> "]
        Elastic["Elasticsearch<br/>Events & Logs"]
        Prometheus["Prometheus<br/>Metrics & Gauges"]
        Datadog["Datadog<br/>Custom Metrics"]
        Splunk["Splunk<br/>Search Indexes"]
    end

    %% Elegant dark theme styling
    classDef default fill:#2d3748,stroke:#4a5568,stroke-width:2px,color:#e2e8f0
    classDef client fill:#2b6cb0,stroke:#3182ce,stroke-width:2px,color:#ffffff
    classDef storage fill:#d69e2e,stroke:#ed8936,stroke-width:2px,color:#ffffff
    classDef scan fill:#38a169,stroke:#48bb78,stroke-width:2px,color:#ffffff
    classDef stream fill:#d53f8c,stroke:#ed64a6,stroke-width:2px,color:#ffffff
    classDef output fill:#805ad5,stroke:#9f7aea,stroke-width:2px,color:#ffffff

    class Client client
    class S3 storage
    class Scan1,Scan2,Scan3,ScanN scan
    class Stream1,Stream2,Stream3,StreamM stream
    class Elastic,Prometheus,Datadog,Splunk output

Benefits

  • Scalability: Functions auto-scale based on query complexity and data volume
  • Parallelism: Time-based partitioning enables concurrent processing across multiple workers
  • Efficiency: Bloom filters minimize unnecessary data reads from object storage
  • Flexibility: Multiple output targets support different analytics use cases (logs, metrics, alerts)
  • Cost-Effective: SQS-based execution model scales with demand

Configuration

To configure the Object storage query input module, Edit these settings.

Below is the default configuration from: query/config.yaml (* Required Fields).

Edit Online

Edit config.yaml Locally

# 🔟❎ 'run' Object storage query AWS S3 configuration

# Configure query AWS S3 bucket settings
# To learn more see https://doc.log10x.com/run/input/objectStorage/query/

# Set the 10x pipeline to 'run'
tenx: run

# =============================== Dependencies ================================

include:
  - run/modules/input/objectStorage/input
  - run/modules/input/objectStorage/query

# =============================== Query Options ===============================

query:

    # 'name' specifies a logical name associated with this query
  - name: my-query

    # 'readPrintProgress' sets whether this input prints throughput stats to the console
    readPrintProgress: $=!TenXEnv.get("quiet")

    # ---------------------------- Storage Options ----------------------------

    # 'objectStorage' selects the underlying storage (e.g., S3). To learn more see: https://doc.log10x.com/run/input/objectStorage

    # Authentication is made via the default provider credentials chain:
    # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html

    # Region selection is made via the default region provider chain:
    # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html

    objectStorageName: AWS

    # 'readContainer' specifies the container (e.g., S3 bucket) in which to search
    readContainer: $=TenXEnv.get("TENX_STREAMER_INPUT_BUCKET", "")  # (❗ REQUIRED)

    # 'indexContainer' specifies the container (e.g., S3 bucket) in which index objects created by the 'index' module reside.
    indexContainer: $=TenXEnv.get("TENX_STREAMER_INDEX_BUCKET", "") # (❗ REQUIRED)

    # ----------------------------- Search Options ----------------------------

    # 'target' specifies a user app/service name (e.g., 'acme-client') to search in.
    target: app          # (❗ REQUIRED)

    # 'from' sets the query's beginning epoch time range (inclusive).
    #  The 'now' function returns the current epoch value with an optional positive/negative offset.
    from: now("-5m")   # (❗ REQUIRED)

    # 'to' sets the end epoch time range (exclusive), must be greater than 'from'.
    to: now()          # (❗ REQUIRED)

    # 'search' Specifies a search expression applied to raw events in the underlying storage 
    #  Supports basic operations: `&&` (AND), `||` (OR), == (equality), and includes(field, text) (substring match).
    search: (severity_level == "ERROR") || (severity_level == "FATAL")

    # 'filters' specify expressions TenXObjects transformed from matching events in storage must meet
    #  To learn more see https://doc.log10x.com/run/output/regulate/#filter-expressions
    filters: []

    # ----------------------------- Limit Options ------------------------------

    # 'limit' options caps the query's processing time and volume of results it will return
    limit:

      # 'processingTime' specifies the time in ms allotted for this query to complete.
      #  Once this period has elapsed the query will terminate regardless of
      #  whether 'resultSize' has not been exceeded.
      processingTime: parseDuration("1m")

      # 'resultSize' specifies the volume in bytes of events matching the target app name, terms and time range to fetch.
      #  Once this volume has been reached the query will terminate.
      resultSize: parseBytes("10MB")

Options

Specify the options below to configure multiple Object storage query input:

Name Description Category
queryName Logical name for the query General
queryReadPrintProgress Sets whether this input prints throughput stats to the console General
queryTarget Logical prefix to filter index objects by Filter
querySearch List of terms to search for Filter
queryFilters List of terms to search for Filter
queryFrom Beginning of the search time range (inclusive) Filter
queryTo End of the search time range (exclusive) Filter
queryObjectStorageName Object storage logical name Access
queryIndexContainer Object storage container containing index objects Scan
queryScanFunctionParallelTimeslice Max time range to process by a scan worker instance Scan
queryScanFunctionParallelMaxInstances Max number of instances of the scan worker Scan
queryScanFunctionParallelThreads Max number of threads allocated per each 'scan' worker instance Scan
queryScanFlushInterval Interval to wait before posting pending 'stream' worker requests Scan
queryScanFunctionUrl Scan worker endpoint URL Scan
queryReadContainer Location of the output file for writing TenXObject field values Stream
queryStreamFunctionParallelObjects Max number of input object byte ranges to process in a single 'stream' instance Stream
queryStreamFunctionParallelByteRange Max input object bytes for a single fetch request Stream
queryStreamFunctionUrl Stream worker endpoint URL Stream
queryObjectStorageArgs Custom object storage args Advanced
queryFilterVars Specifies a list of variable values to scan for Advanced
queryFilterTemplateHashes Specifies a list of TenXTemplate hash values to scan for Advanced
queryID Unique ID of host query Advanced
queryElapseTime Epoch after which processing times out Advanced
queryScanFunctionLimitResultSizeInterval Interval by which to check whether 'queryLimitResultSize' is exceeded Advanced
queryLimitProcessingTime Max milliseconds alloted for this query Limit
queryLimitResultSize Max byte volume of matching events to fetch Limit

General

queryName

Logical name for the query.

Type Default Category
String "" General

Specifies a logical name associated with the query (e.g., errorsQuery) This value identifies this query operation in aggregated metrics reports. If not specified, defaults to a concatenation of queryTarget and querySearch.

queryReadPrintProgress

Sets whether this input prints throughput stats to the console.

Type Default Category
Boolean false General

Sets whether this input prints throughput stats to the console This value is commonly used when testing an integration to a remote endpoint.

Filter

queryTarget

Logical prefix to filter index objects by.

Type Required Category
String Filter

Specifies a logical name associated with the search target. This value identifies the app that produced the queried events (e.g., acme-client).

This value corresponds with the indexWriteTarget argument.

querySearch

List of terms to search for.

Type Default Category
String "" Filter

Specifies a search expression applied to the underlying storage via TenXTemplate filters for efficient pre-filtering. Supports only basic operations: && (AND), || (OR), == (equality), and includes (substring match).

Example:

(level==INFO || level==WARN) && includes(text, "POST")

This narrows down data fetched from storage before in-memory processing. To perform more complex in-memory filtering, specify queryFilters.

queryFilters

List of terms to search for.

Type Default Category
List [] Filter

Specifies JavaScript expressions applied in-memory to all TenXObjects that match the querySearch expression.

Supports all valid TenX JavaScript expressions for complex filtering, enrichment, and processing.

This provides precise, exact filtering after data is fetched from storage.

queryFilters:
  - LevelTemplate.level == "ERROR" 
  - Math.max(this.price, 100)

queryFrom

Beginning of the search time range (inclusive).

Type Required Category
String Filter

defines the beginning of the search time range (inclusive) expressed as a UNIX millisecond epoch value.

This value commonly uses the now function.

For example, to express the value of one hour before the current millisecond epoch, specify:

queryFrom: now("-1h")

queryTo

End of the search time range (exclusive).

Type Required Category
String Filter

defines the end of the search time range expressed as a UNIX millisecond epoch value (exclusive).

This value commonly uses the now function.

For example, to express the value of the current millisecond epoch, specify:

queryTo: now()

Access

queryObjectStorageName

Object storage logical name.

Type Required Category
String Access

Identifies the Object Storage against which this query is made (e.g., AWS).

Scan

queryIndexContainer

Object storage container containing index objects.

Type Required Category
String Scan

Specifies the object storage container (e.g., AWS S3 bucket) name in which to store TenXTemplate Filters'.

This value corresponds with the indexWriteContainer.

queryScanFunctionParallelTimeslice

Max time range to process by a scan worker instance.

Type Default Category
String "" Scan

Specifies the max time duration (e.g., 1min) an instance of the scan worker will traverse within the index. For example, if a query specifies a time range of 15min and this value is set to 1m, 15 instances of the scan worker will execute in parallel to scan through the index. If this value is 0, a single worker instance will sequentially scan through the index.

queryScanFunctionParallelMaxInstances

Max number of instances of the scan worker.

Type Default Category
Number 1000 Scan

Specifies the maximum number of scan worker instances that will traverse the index. if the provided time range and timeSlice provided will result in more instances than the provided value, the time slice will increase its size until the amount of workers needed will be less than this value.

For example, if a query specifies a time range of 60min and a time slice of 1m, while this value is set to 25, the time slice will be effectively increased to 3m to accommodate this.

The maximum value for this option is: 10,000.

queryScanFunctionParallelThreads

Max number of threads allocated per each 'scan' worker instance.

Type Default Category
Number 0 Scan

Sets the number of JVM threads to scan through a time slice of the index within a scan worker instance. For example, if timeSlice is 10min and this value is set to 10, scan each 1m index time range using a dedicated thread. If set to 1 or 0, a single thread will scan through the entire index time range (e.g., 10min).

queryScanFlushInterval

Interval to wait before posting pending 'stream' worker requests.

Type Default Category
String "" Scan

Specifies a maximum interval (e.g., 2sec) when scanning an index time slice to wait to submit pending stream worker requests. If not set, all stream worker requests are sent after the index scanning process for a target index time slice has finished.

queryScanFunctionUrl

Scan worker endpoint URL.

Type Required Category
String Scan

Specifies the endpoint used to invoke the scan worker.

Stream

queryReadContainer

Location of the output file for writing TenXObject field values.

Type Required Category
String Stream

Specifies the object storage container (e.g., S3 bucket) name where input objects (e.g., log file) to scan are stored.

This value corresponds with the indexWriteContainer.

queryStreamFunctionParallelObjects

Max number of input object byte ranges to process in a single 'stream' instance.

Type Default Category
Number 0 Stream

Defines the maximum number of Storage objects to process by a single instance of the stream worker.

For example, if set to 50 and the scan phase has identified 200 potentially matching input object byte ranges, four instances of the stream worker will split the workload in parallel.

queryStreamFunctionParallelByteRange

Max input object bytes for a single fetch request.

Type Required Category
String Stream

Defines the max byte range within an input object (e.g., log file) that can fetched via a single request.

For example, if this value is 5Mb and two matching 1Mb byte ranges spaced 2Mb apart are identified within a target object fetch both from the storage via a single operation. If there is a 2Mb gap, launch two instances of the stream worker.

queryStreamFunctionUrl

Stream worker endpoint URL.

Type Required Category
String Stream

Specifies the endpoint used to invoke the stream worker.

Advanced

queryObjectStorageArgs

Custom object storage args.

Type Default Category
List [] Advanced

Custom arguments passed as a map to the constructor of the underlying object storage. This list is expected to hold pairs of key values (e.g., args: [key1, value1, key2, value2]).

queryFilterVars

Specifies a list of variable values to scan for.

Type Default Category
List [] Advanced

Specifies a list of variable values to scan for.

queryFilterTemplateHashes

Specifies a list of TenXTemplate hash values to scan for.

Type Default Category
List [] Advanced

Specifies a list of TenXTemplate hash values to scan for.

queryID

Unique ID of host query.

Type Default Category
String "" Advanced

UUID identifying this query. This value is automatically assigned if not explicitly set.

queryElapseTime

Epoch after which processing times out.

Type Default Category
Number 0 Advanced

Epoch value which if exceeds the current system time will result in this request being cancelled.

queryScanFunctionLimitResultSizeInterval

Interval by which to check whether 'queryLimitResultSize' is exceeded.

Type Default Category
String "" Advanced

Interval by which to check whether queryLimitResultSize that is the query has returned >= the request result volume.

Limit

queryLimitProcessingTime

Max milliseconds alloted for this query.

Type Required Category
String Limit

Milliseconds allotted for this query to complete. Once this period has elapsed the query will terminate regardless of whether queryLimitResultSize has not been exceeded.

queryLimitResultSize

Max byte volume of matching events to fetch.

Type Required Category
String Limit

Volume in bytes of events matching the app prefix, terms and time range to fetch. Once this volume has been reached the query will terminate.

If queryLimitProcessingTime has elapsed before the requested result volume has been fetched, the query will still terminate.


This module is defined in query/module.yaml.