Research
LP-6440
DraftLuxStore Historical Storage
LuxStore Historical Storage specification for LuxDA Bus
Abstract
This LP defines LuxStore, the historical blob storage and query layer for LuxDA. While DA operators provide short-term availability guarantees, LuxStore enables long-term archival and efficient querying of historical data.
Motivation
Applications need historical data access beyond DA retention:
- Historical Queries: Retrieve messages by time range, namespace, sender
- Archival: Long-term preservation of important data
- Indexing: Efficient search and filtering
- Proofs: Bind query results to canonical headers
Specification
1. Store Node Architecture
1.1 Components
type StoreNode struct {
// Blob storage backend
BlobStore BlobStorage
// Index databases
NamespaceIndex *NamespaceIndexDB
TimeIndex *TimeIndexDB
SenderIndex *SenderIndexDB
// Header chain client
HeaderChain HeaderChainClient
// P2P network
Network P2PClient
// Configuration
Config StoreConfig
}
type StoreConfig struct {
// Storage limits
MaxStorageBytes uint64
RetentionPeriod time.Duration
// Indexing options
IndexNamespaces []NamespaceId // Empty = all
IndexSenders bool
// Serving options
MaxQueryResults uint32
RateLimits RateLimitConfig
}
1.2 Storage Backend
type BlobStorage interface {
// Store blob with metadata
Put(commitment []byte, blob []byte, meta *BlobMeta) error
// Retrieve blob by commitment
Get(commitment []byte) ([]byte, *BlobMeta, error)
// Check existence
Has(commitment []byte) bool
// Delete blob
Delete(commitment []byte) error
// Iterate blobs
Iterate(fn func(commitment []byte, meta *BlobMeta) bool) error
}
type BlobMeta struct {
NamespaceId [20]byte
Seq uint64
BlockHeight uint64
Timestamp uint64
BlobLen uint32
StoredAt uint64
ExpiresAt uint64
}
2. Query API
2.1 Query Service
service StoreQueryService {
// Query by namespace and sequence range
rpc QueryByNamespace(NamespaceQuery) returns (stream BlobResult);
// Query by time range
rpc QueryByTime(TimeQuery) returns (stream BlobResult);
// Query by sender
rpc QueryBySender(SenderQuery) returns (stream BlobResult);
// Get specific blob
rpc GetBlob(GetBlobRequest) returns (GetBlobResponse);
// Get blob with proof
rpc GetBlobWithProof(GetBlobWithProofRequest) returns (GetBlobWithProofResponse);
}
message NamespaceQuery {
bytes namespace_id = 1;
uint64 from_seq = 2;
uint64 to_seq = 3;
uint32 limit = 4;
uint32 offset = 5;
bool include_blobs = 6;
}
message TimeQuery {
uint64 from_timestamp = 1;
uint64 to_timestamp = 2;
repeated bytes namespace_ids = 3; // Filter by namespaces
uint32 limit = 4;
bool include_blobs = 5;
}
message BlobResult {
bytes header = 1;
bytes blob = 2;
bytes proof = 3;
}
2.2 Query Processing
func (s *StoreNode) QueryByNamespace(q *NamespaceQuery) ([]*BlobResult, error) {
// Query index
entries, err := s.NamespaceIndex.Range(
q.NamespaceId,
q.FromSeq,
q.ToSeq,
q.Limit,
q.Offset,
)
if err != nil {
return nil, err
}
// Fetch results
results := make([]*BlobResult, 0, len(entries))
for _, entry := range entries {
result := &BlobResult{
Header: entry.Header,
}
if q.IncludeBlobs {
blob, _, err := s.BlobStore.Get(entry.BlobCommitment)
if err == nil {
result.Blob = blob
}
}
results = append(results, result)
}
return results, nil
}
3. Indexing
3.1 Namespace Index
type NamespaceIndexDB struct {
db *leveldb.DB
}
// Key: namespace_id || seq
// Value: IndexEntry
type IndexEntry struct {
BlobCommitment [32]byte
BlockHeight uint64
Timestamp uint64
Header []byte
}
func (idx *NamespaceIndexDB) Put(nsId [20]byte, seq uint64, entry *IndexEntry) error {
key := append(nsId[:], uint64ToBytes(seq)...)
value := encodeEntry(entry)
return idx.db.Put(key, value, nil)
}
func (idx *NamespaceIndexDB) Range(nsId [20]byte, fromSeq, toSeq uint64, limit, offset uint32) ([]*IndexEntry, error) {
startKey := append(nsId[:], uint64ToBytes(fromSeq)...)
endKey := append(nsId[:], uint64ToBytes(toSeq+1)...)
iter := idx.db.NewIterator(&util.Range{Start: startKey, Limit: endKey}, nil)
defer iter.Release()
var entries []*IndexEntry
skipped := uint32(0)
for iter.Next() {
if skipped < offset {
skipped++
continue
}
entry := decodeEntry(iter.Value())
entries = append(entries, entry)
if limit > 0 && uint32(len(entries)) >= limit {
break
}
}
return entries, iter.Error()
}
3.2 Time Index
type TimeIndexDB struct {
db *leveldb.DB
}
// Key: timestamp || namespace_id || seq
// Value: BlobCommitment
func (idx *TimeIndexDB) Put(timestamp uint64, nsId [20]byte, seq uint64, commitment [32]byte) error {
key := make([]byte, 8+20+8)
binary.BigEndian.PutUint64(key[0:8], timestamp)
copy(key[8:28], nsId[:])
binary.BigEndian.PutUint64(key[28:36], seq)
return idx.db.Put(key, commitment[:], nil)
}
func (idx *TimeIndexDB) Range(from, to uint64, nsIds []NamespaceId) ([]TimeIndexEntry, error) {
startKey := uint64ToBytes(from)
endKey := uint64ToBytes(to + 1)
iter := idx.db.NewIterator(&util.Range{Start: startKey, Limit: endKey}, nil)
defer iter.Release()
var entries []TimeIndexEntry
nsFilter := makeNsFilter(nsIds)
for iter.Next() {
timestamp := binary.BigEndian.Uint64(iter.Key()[0:8])
var nsId [20]byte
copy(nsId[:], iter.Key()[8:28])
seq := binary.BigEndian.Uint64(iter.Key()[28:36])
if len(nsFilter) > 0 && !nsFilter[nsId] {
continue
}
var commitment [32]byte
copy(commitment[:], iter.Value())
entries = append(entries, TimeIndexEntry{
Timestamp: timestamp,
NamespaceId: nsId,
Seq: seq,
BlobCommitment: commitment,
})
}
return entries, iter.Error()
}
4. Proofs
4.1 Inclusion Proof
type BlobInclusionProof struct {
// Header inclusion in block
HeaderProof *HeaderInclusionProof
// Blob commitment matches header
BlobCommitment [32]byte
// Optional: blob matches commitment
BlobHash [32]byte
}
func (s *StoreNode) GenerateInclusionProof(commitment [32]byte) (*BlobInclusionProof, error) {
// Find header containing this blob
meta, err := s.BlobStore.GetMeta(commitment)
if err != nil {
return nil, err
}
// Get header inclusion proof from header chain
headerProof, err := s.HeaderChain.GetInclusionProof(
meta.NamespaceId,
meta.Seq,
meta.BlockHeight,
)
if err != nil {
return nil, err
}
return &BlobInclusionProof{
HeaderProof: headerProof,
BlobCommitment: commitment,
BlobHash: sha3.Sum256(blob),
}, nil
}
4.2 Proof Verification
func VerifyBlobInclusionProof(proof *BlobInclusionProof, blob []byte, trustedRoot [32]byte) bool {
// 1. Verify blob matches commitment
if sha3.Sum256(blob) != proof.BlobHash {
return false
}
// Verify commitment in header matches
header := DecodeHeader(proof.HeaderProof.Header)
if header.BlobCommitment != proof.BlobCommitment {
return false
}
// 3. Verify header inclusion
return VerifyHeaderInclusionProof(proof.HeaderProof, trustedRoot)
}
5. Synchronization
5.1 Header Chain Sync
func (s *StoreNode) SyncFromHeaderChain() error {
// Get last synced height
lastHeight := s.GetLastSyncedHeight()
// Stream new blocks
blocks := s.HeaderChain.StreamBlocks(lastHeight + 1)
for block := range blocks {
for _, batch := range block.LaneBatches {
// Check if we're indexing this namespace
if !s.ShouldIndex(batch.NamespaceId) {
continue
}
for _, header := range batch.Headers {
// Index header
s.IndexHeader(&header, block.Height)
// Fetch and store blob
if header.BlobLen > 0 {
blob, err := s.FetchBlob(header.BlobCommitment)
if err == nil {
s.StoreBlob(header.BlobCommitment, blob, &header)
}
}
}
}
s.SetLastSyncedHeight(block.Height)
}
return nil
}
5.2 Blob Fetching
func (s *StoreNode) FetchBlob(commitment [32]byte) ([]byte, error) {
// Try DA operators first
blob, err := s.DAClient.GetBlob(commitment)
if err == nil {
return blob, nil
}
// Try peer store nodes
for _, peer := range s.Network.StorePeers() {
blob, err := peer.GetBlob(commitment)
if err == nil && sha3.Sum256(blob) == commitment {
return blob, nil
}
}
return nil, ErrBlobNotFound
}
6. Retention Management
6.1 Retention Policy
type RetentionPolicy struct {
// Default retention period
DefaultRetention time.Duration
// Per-namespace overrides
NamespaceRetention map[NamespaceId]time.Duration
// Retention by priority
PriorityRetention map[Priority]time.Duration
// Maximum storage size
MaxStorage uint64
}
func (s *StoreNode) ApplyRetention() error {
now := time.Now()
// Iterate all blobs
s.BlobStore.Iterate(func(commitment []byte, meta *BlobMeta) bool {
// Check expiry
retention := s.GetRetention(meta)
expiresAt := time.Unix(int64(meta.StoredAt), 0).Add(retention)
if now.After(expiresAt) {
s.DeleteBlob(commitment, meta)
}
return true
})
// Check total size
if s.BlobStore.Size() > s.Config.MaxStorageBytes {
s.EvictLRU()
}
return nil
}
7. Metrics
| Metric | Description |
|---|---|
store_blobs_total | Total blobs stored |
store_size_bytes | Total storage used |
store_queries_total | Queries by type |
store_query_latency_ms | Query latency |
store_sync_height | Last synced block height |
Rationale
Why Separate from DA Layer?
- DA provides short-term guaranteed availability
- Store provides long-term archival with economics
- Different retention and query requirements
Why Index by Multiple Keys?
- Namespace queries for application state
- Time queries for analytics and debugging
- Sender queries for user data access
Why Proofs?
- Bind query results to trusted header chain
- Enable trustless verification
- Support light clients
Security Considerations
Query DOS
Mitigated by:
- Rate limiting
- Query result limits
- Pagination
Data Integrity
Ensured by:
- Blob commitment verification
- Header chain proofs
- Merkle proofs for query results
Storage Exhaustion
Managed by:
- Retention policies
- Storage quotas
- LRU eviction
Test Plan
Unit Tests
- Index Operations: CRUD on all indexes
- Query Processing: Various query types
- Proof Generation: Valid proofs for stored data
Integration Tests
- Sync Flow: Sync from header chain
- Query Flow: End-to-end queries
- Retention: Correct data expiration
Performance Tests
- Query Throughput: Queries per second
- Index Performance: Large dataset operations
- Sync Speed: Blocks per second
References
LP-6440 v1.0.0 - 2026-01-02