Berserk Docs

Query gRPC API

Streaming query execution over gRPC

The query service exposes a gRPC API on port 9510 alongside the HTTP API. The gRPC API provides streaming query results, making it suitable for large result sets and real-time progress updates.

QueryService

service QueryService {
  rpc ExecuteQuery(ExecuteQueryRequest) returns (stream ExecuteQueryResultFrame);
}

ExecuteQuery returns a stream of frames. Clients receive schema, row batches, progress updates, and metadata as separate frames, allowing incremental rendering of results.

Frame Types

FrameDescription
schemaTable schema (sent before any row batches for that table)
batchRow data. Each batch has a result_iteration_id — when the ID changes, discard previous rows
progressExecution statistics: rows processed, chunks scanned/skipped, timing
doneQuery complete, close the stream
errorError with code, message, and source location
metadataWarnings and visualization hints

Result Iterations

Row batches carry a result_iteration_id. When a new iteration starts, all previous rows should be discarded. This enables incremental refinement of results. The is_iteration_complete flag on each batch indicates whether the iteration is fully delivered.

Proto Definition

// The query service executes BQL queries and streams results back to the client.
service QueryService {
  // Execute a BQL query and receive results as a stream of typed frames.
  // The stream delivers Schema, RowBatch, Progress, and Metadata frames,
  // terminated by a Completion frame or an Error frame.
  rpc ExecuteQuery(ExecuteQueryRequest) returns (stream ExecuteQueryResultFrame);
}

// Request to execute a BQL query.
message ExecuteQueryRequest {
  // The BQL query string to execute.
  string query = 1;

  // Start of the time range (inclusive). Accepts relative expressions like "1h ago"
  // or absolute timestamps like "2024-01-01T00:00:00Z".
  string since = 2;

  // End of the time range (exclusive). Same format as `since`. Defaults to now.
  string until = 3;

  // IANA timezone name for time-based operations (e.g., "America/New_York").
  // Defaults to UTC if not specified.
  string timezone = 4;
}

// A single frame in the streaming response for a query.
// Frames arrive in order: Schema frames first, then interleaved RowBatch and Progress frames,
// and finally a Completion or Error frame.
message ExecuteQueryResultFrame {
  // Unique identifier for this request, echoed from the server.
  string request_id = 1;

  oneof payload {
    // Table schema — sent once per table before any RowBatch frames for that table.
    TableSchema schema = 2;

    // A batch of result rows. Multiple batches may arrive for the same table and iteration.
    // When `result_iteration_id` changes, discard all previous rows and start fresh.
    RowBatch batch = 3;

    // Signals that the query has completed and no more frames will be sent.
    Completion done = 5;

    // Cumulative execution statistics. Sent periodically during query execution.
    // Each Progress frame supersedes the previous one.
    Progress progress = 4;

    // A query execution error. The stream ends after this frame.
    Error error = 6;

    // Warnings, partial failures, and visualization hints for the current result set.
    ResultMetadata metadata = 7;
  }
}

// Schema definition for a result table.
message TableSchema {
  // Table name (e.g., "PrimaryResult", "ExtraTable_0", or a fork branch name).
  string name = 1;

  // Ordered list of columns in this table.
  repeated Column columns = 2;
}

// A column definition within a table schema.
message Column {
  // Column name.
  string name = 1;

  // Data type of the column.
  ColumnType type = 2;

  // Whether the column may contain null values.
  bool nullable = 3;
}

// BQL data types for column definitions.
enum ColumnType {
  COLUMN_TYPE_UNSPECIFIED = 0;
  COLUMN_TYPE_BOOL = 1;
  COLUMN_TYPE_INT = 2;
  COLUMN_TYPE_LONG = 3;
  COLUMN_TYPE_REAL = 4;
  COLUMN_TYPE_STRING = 5;
  COLUMN_TYPE_DATETIME = 6;
  COLUMN_TYPE_TIMESPAN = 7;
  COLUMN_TYPE_GUID = 8;
  COLUMN_TYPE_DYNAMIC = 9;
}

// A batch of rows belonging to a single table and result iteration.
message RowBatch {
  // Name of the table this batch belongs to (matches a TableSchema.name).
  string table_name = 1;

  // Opaque identifier for the current result iteration. When this value changes,
  // all previously received rows for all tables must be discarded — the new iteration
  // represents a more complete result set that supersedes the previous one.
  string result_iteration_id = 2;

  // Rows in this batch. Column order matches the TableSchema for this table.
  repeated ValueRow rows = 3;

  // True when this is the last batch for this table in the current iteration.
  // Clients should show the first batch immediately for fast feedback, then
  // accumulate subsequent batches until this flag is true.
  bool is_iteration_complete = 4;
}

// A single row of dynamically-typed values.
message ValueRow {
  // Cell values in column order. Each value corresponds to the column at the same
  // index in the TableSchema.
  repeated berserk.BqlValue values = 1;
}

// Cumulative execution statistics for the running query.
// Each Progress frame contains the total counts since the query started —
// always use the latest frame and discard earlier ones.
message Progress {
  // Total rows processed across all chunks.
  uint64 rows_processed = 1;

  // Total number of chunks in the query's time range.
  uint64 chunks_total = 2;

  // Chunks that were scanned (read and evaluated).
  uint64 chunks_scanned = 3;

  // Chunks skipped because their time range didn't overlap the query range.
  uint64 chunks_skipped_range = 4;

  // Chunks skipped by bloom filter (no matching values).
  uint64 chunks_skipped_bloom = 5;

  // Chunks skipped by shard hash (not matching the target shard).
  uint64 chunks_skipped_shard = 6;

  // Total predicate evaluations performed during scanning.
  uint64 predicate_checks = 7;

  reserved 8;
  reserved "bloom_checks";

  // True when the query completed early via short-circuit optimization.
  bool short_circuit_completion = 9;

  // Total uncompressed bytes of scanned chunk bodies.
  uint64 chunk_scanned_raw_body_size = 10;

  // Total uncompressed bytes of skipped chunk bodies.
  uint64 chunk_skipped_raw_body_size = 11;

  // Total compressed bytes of skipped chunks.
  uint64 chunk_skipped_compressed_size = 12;

  // Wall-clock time spent scanning chunks (nanoseconds).
  uint64 chunk_scan_time_nanos = 13;

  // Total query execution time (nanoseconds).
  uint64 query_time_nanos = 14;

  // Total compressed bytes of scanned chunks.
  uint64 chunk_scanned_compressed_size = 15;

  // Per-bin completion progress for `summarize ... by bin()` queries.
  optional BinProgress bin_progress = 16;

  // Time spent waiting in the query queue before execution started (nanoseconds).
  // Present when the query was queued due to concurrent query limits.
  optional uint64 queue_wait_nanos = 17;

  // Segment planning progress. Present during planning, absent after planning completes.
  optional PlanningProgress planning_progress = 18;

  // Total bytes of bloom filter data evaluated during bloom filtering.
  uint64 bloom_filter_bytes = 19;

  // Wall-clock time spent in merge and delivery across all query threads (nanoseconds).
  uint64 merge_time_nanos = 20;

  // Chunks that were scanned but yielded zero matching rows (false positives from pre-filtering).
  uint64 chunks_empty_scan = 21;

  // Chunks that encountered errors during row processing (e.g., type conversion failure).
  uint64 chunks_errored = 22;

  // Chunks skipped because required input fields were absent from the chunk schema.
  uint64 chunks_skipped_required_fields = 23;

  // Per-operator diagnostic telemetry in a stable key-value envelope.
  repeated OperatorDiagnostics operator_diagnostics = 24;
}

// Diagnostic telemetry for a specific query operator.
message OperatorDiagnostics {
  // Operator kind (e.g., "summarize", "join").
  string kind = 1;

  // Operator ID within the query plan.
  uint32 operator_id = 2;

  // Key-value diagnostic entries.
  repeated KeyValue values = 3;
}

// A string key-value pair.
message KeyValue {
  string key = 1;
  string value = 2;
}

// Segment planning progress.
message PlanningProgress {
  // Number of segments that have completed planning.
  uint64 segments_done = 1;

  // Total number of segments to plan.
  uint64 segments_total = 2;
}

// Per-bin completion progress for `summarize ... by bin()` queries.
message BinProgress {
  // Start value of the first bin boundary.
  // Bin N covers [first_bin_start + N * bin_span, first_bin_start + (N+1) * bin_span).
  sint64 first_bin_start = 1;

  // Width of each bin.
  uint64 bin_span = 2;

  // Completion percentage (0-100) for each bin, one byte per bin.
  // Index i corresponds to bin i. Value 100 means fully scanned.
  bytes completion_percentages = 3;
}

// Signals that the query has completed successfully.
message Completion {}

// A query execution error.
message Error {
  // Error code discriminator (e.g., "UnknownFunction", "TypeMismatch").
  string code = 1;

  // Brief error title.
  string title = 2;

  // Support ticket ID for tracking.
  string support_ticket_id = 4;

  // Human-readable error message with source annotations.
  string message = 5;

  // Source code location where the error occurred.
  Location location = 7;

  // Structured error details as JSON.
  string details = 8;
}

// A range within the query source text.
message Location {
  uint32 start_byte = 1;
  uint32 end_byte = 2;
  uint32 start_line = 3;
  uint32 start_column = 4;
  uint32 end_line = 5;
  uint32 end_column = 6;
}

// A partial failure for one or more segments that could not be read.
message PartialFailure {
  // IDs of the affected segments.
  repeated string segment_ids = 1;

  // Human-readable error description.
  string message = 2;
}

// Visualization metadata from the `render` operator.
message VisualizationMetadata {
  // Visualization type (e.g., "table", "timechart", "piechart", "linechart").
  optional string visualization_type = 1;

  // Visualization properties (e.g., x-column, y-columns, legend).
  map<string, string> properties = 2;
}

// Metadata for the current result set.
message ResultMetadata {
  // Segments that could not be read (partial data loss).
  repeated PartialFailure partial_failures = 1;

  // Visualization hints from the `render` operator, if present.
  optional VisualizationMetadata visualization = 2;

  // Execution warnings (e.g., "summarize memory limit reached", "result truncated").
  repeated QueryWarning warnings = 3;
}

// A warning produced during query execution.
message QueryWarning {
  // Operator ID for source correlation.
  uint32 operator_id = 1;

  // Branch index for fork queries (0 = main branch).
  optional uint32 branch_index = 2;

  // Warning kind discriminator (e.g., "SummarizeMemoryLimit", "ResultTruncated").
  string kind = 3;

  // Source location where the warning originated.
  optional Location location = 4;

  // Human-readable warning message.
  string message = 5;

  // Structured warning details as JSON.
  string details = 6;
}

On this page