Skip to content

codeuchain/codeupipe

Repository files navigation

codeupipe

Python pipeline framework — composable Payload → Filter → Pipeline pattern with streaming support. Zero external dependencies.

Experimental successor to codeuchain (Python only).

Core Concepts

Concept Role
Payload Immutable data container flowing through the pipeline
MutablePayload Mutable sibling for performance-critical bulk edits
Filter Processing unit — takes a Payload in, returns a transformed Payload out (sync or async)
StreamFilter Streaming processing unit — receives one chunk, yields 0, 1, or N output chunks
Pipeline Orchestrator — .run() for batch, .stream() for streaming
Valve Conditional flow control — gates a Filter with a predicate
Tap Non-modifying observation point — inspect without changing (sync or async)
State Pipeline execution metadata — tracks what ran, what was skipped, errors, chunk counts
Hook Lifecycle hooks — before/after/on_error for pipeline execution (sync or async)
RetryFilter Resilience wrapper — retries a Filter up to N times before giving up
CircuitOpenError Raised when a pipeline circuit breaker is open and rejecting calls

Install

pip install -e .

Quick Start

import asyncio
from codeupipe import Payload, Pipeline

# Filters can be sync or async — both work
class CleanInput:
    def call(self, payload):
        return payload.insert("text", payload.get("text", "").strip())

class Validate:
    def call(self, payload):
        if not payload.get("text"):
            raise ValueError("Empty input")
        return payload

# Build and run
pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")

result = asyncio.run(pipeline.run(Payload({"text": "  hello  "})))
print(result.get("text"))  # "hello"

Valve (Conditional Flow)

from codeupipe import Valve

class DiscountFilter:
    def call(self, payload):
        price = payload.get("price", 0)
        return payload.insert("price", price * 0.9)

# Only applies when predicate returns True
pipeline.add_filter(
    Valve("discount", DiscountFilter(), lambda p: p.get("tier") == "premium"),
    name="discount",
)

Tap (Observation)

class AuditTap:
    async def observe(self, payload):
        print(f"Payload at this point: {payload.to_dict()}")

pipeline.add_tap(AuditTap(), name="audit")

Streaming

Process an async stream of chunks through the same pipeline at constant memory.

from codeupipe import Payload, Pipeline

class UppercaseFilter:
    def call(self, payload):
        return payload.insert("name", payload.get("name", "").upper())

async def names():
    for n in ["alice", "bob", "charlie"]:
        yield Payload({"name": n})

async def main():
    pipeline = Pipeline()
    pipeline.add_filter(UppercaseFilter(), name="upper")

    async for result in pipeline.stream(names()):
        print(result.get("name"))  # ALICE, BOB, CHARLIE

asyncio.run(main())

Use StreamFilter to drop, fan-out, or batch:

from typing import AsyncIterator

class DropEmpty:
    async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
        if chunk.get("line", "").strip():
            yield chunk

class SplitWords:
    async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
        for word in chunk.get("text", "").split():
            yield Payload({"word": word})

Synchronous Execution

No manual asyncio.run() needed — run_sync() handles it:

pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")

result = pipeline.run_sync(Payload({"text": "  hello  "}))
print(result.get("text"))  # "hello"

Parallel Execution (Fan-out / Fan-in)

Run independent filters concurrently; results merge back into the payload:

pipeline = Pipeline()
pipeline.add_parallel([
    FetchUserFilter(),
    FetchOrdersFilter(),
    FetchRecommendationsFilter(),
], name="fan-out")

result = pipeline.run_sync(Payload({"user_id": 42}))

Pipeline Nesting

Compose pipelines from smaller pipelines:

validation = Pipeline()
validation.add_filter(CleanInput(), name="clean")
validation.add_filter(Validate(), name="validate")

processing = Pipeline()
processing.add_pipeline(validation, name="validation-sub")
processing.add_filter(TransformFilter(), name="transform")

result = processing.run_sync(Payload({"text": "  hello  "}))

Retry & Circuit Breaker

Pipeline-level resilience wrappers:

# Retry the entire pipeline up to 3 times on failure
retrying = pipeline.with_retry(max_retries=3)
result = retrying.run_sync(Payload({"input": "data"}))

# Open the circuit after 5 consecutive failures
from codeupipe import CircuitOpenError

breaker = pipeline.with_circuit_breaker(failure_threshold=5)
try:
    result = breaker.run_sync(Payload({"input": "data"}))
except CircuitOpenError:
    print("Service unavailable — circuit is open")

Execution State

result = await pipeline.run(payload)
print(pipeline.state.executed)           # ['clean', 'validate']
print(pipeline.state.skipped)            # ['admin_only']
print(pipeline.state.chunks_processed)   # {'upper': 3}  (streaming mode)

Docs

Document Purpose
INDEX.md Project structure map (verified by cup doc-check)
CONCEPTS.md Full API reference with runnable examples
BEST_PRACTICES.md Project structure, naming, testing strategy
SKILL.md Agent skill reference — types, patterns, conversion
ROADMAP.md Expansion rings — from framework to platform

CLI (cup)

The cup command-line tool scaffolds, lints, and analyzes CUP projects:

cup new filter validate_email src/signup   # Scaffold a filter + test
cup new pipeline signup src/signup --steps validate_email hash_password
cup list                                   # Show available component types
cup bundle src/signup                      # Generate __init__.py re-exports
cup lint src/signup                        # Check CUP conventions (CUP000–CUP008)
cup coverage src/signup                    # Map component↔test coverage gaps
cup report src/signup                      # Health report with scores, orphans, staleness
cup doc-check .                            # Verify doc freshness (cup:ref markers)
cup run pipeline.json --discover ./filters # Execute a pipeline from config
cup connect --list                         # Show configured connectors
cup connect --health                       # Run connector health checks
cup marketplace search "payments"          # Search community connectors
cup marketplace info codeupipe-stripe      # Detailed connector info
cup marketplace install codeupipe-stripe   # Install from PyPI
cup describe pipeline.json                 # Inspect pipeline inputs, outputs, steps
cup describe pipeline.json --json          # Machine-readable output (--json works globally)
cup distribute checkpoint cp.json --status # Manage payload checkpoints
cup distribute remote https://api.example  # Test a remote filter endpoint
cup test                                   # Smart test runner with markers
cup doctor                                 # Project health diagnostics
cup graph pipeline.json                    # Mermaid pipeline visualization
cup version --bump patch                   # Show/bump semver

Marketplace & Community Connectors

The codeupipe Marketplace is a community-driven index of connectors and components. Packages live on PyPI — the marketplace makes them discoverable.

# Find connectors
cup marketplace search "ai"
# → codeupipe-google-ai ✅ (v0.1.0) — Multimodal generation, embeddings, and vision

# Install (wrapper around pip install)
cup marketplace install codeupipe-google-ai

# Connector self-registers — immediately available
cup connect --list
# → google-ai: GeminiGenerate, GeminiGenerateStream, GeminiEmbed, GeminiVision

Available Connectors

Package Provider Filters
codeupipe-google-ai Google AI (Gemini) GeminiGenerate, GeminiGenerateStream, GeminiEmbed, GeminiVision
codeupipe-stripe Stripe StripeCheckout, StripeSubscription, StripeWebhook, StripeCustomer
codeupipe-postgres PostgreSQL PostgresQuery, PostgresExecute, PostgresTransaction, PostgresBulkInsert
codeupipe-resend Resend ResendEmail, ResendTemplate

Publish Your Own

Built a connector? Share it with the community:

  1. Publish your package to PyPI with codeupipe.connectors entry points
  2. Fork codeuchain/codeupipe-marketplace
  3. Add a manifest.json for your package
  4. Open a PR — CI validates automatically

See the Marketplace Contributing Guide for the full walkthrough.

Auth & Vault

codeupipe ships a proxy token vault that decouples pipelines from real credentials. Instead of passing raw access tokens through Payload, the vault issues opaque cup_tok_* references. Filters only ever see the proxy — resolution to the real credential happens at the trust boundary via VaultHook.

┌──────────┐      cup_tok_abc123      ┌───────────┐
│ Pipeline │ ───────────────────────► │ VaultHook │
│ (filters │                          │  resolve   │
│  see tok)│ ◄─────────────────────── │  ↕ Vault   │
└──────────┘    real access_token     └───────────┘

Quick usage

from codeupipe.auth import (
    TokenVault, VaultHook, CredentialStore, GoogleOAuth,
)

store = CredentialStore("tokens.json")
vault = TokenVault(store)

# Inject proxy tokens automatically
pipeline.use_hook(VaultHook(vault, GoogleOAuth(...), scope="calendar"))
result = await pipeline.run(Payload({"user": "alice"}))
# Filters receive payload["credential"] as a cup_tok_* proxy

Vault CLI

cup vault issue google calendar          # Issue a scoped proxy token
cup vault resolve cup_tok_abc123         # Resolve to real credential (admin only)
cup vault revoke cup_tok_abc123          # Revoke a single token
cup vault revoke-all                     # Revoke every active token
cup vault list                           # Show active proxy tokens
cup vault status                         # Vault summary + ledger stats

Every vault action is recorded in the TokenLedger audit trail.

Testing Utilities

codeupipe.testing provides zero-boilerplate test helpers:

from codeupipe.testing import run_filter, assert_payload, mock_filter

def test_my_filter():
    result = run_filter(MyFilter(), {"input": "data"})
    assert_payload(result, output="expected")

def test_with_mock():
    f = mock_filter(status="ok")
    result = run_filter(f, {"x": 1})
    assert f.call_count == 1

Test

pytest  # 1844 tests

License

Apache 2.0

About

Python pipeline framework — composable State-Link-Chain pattern

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages