strictflow


Namestrictflow JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryA lightweight, strict flow controller for enforcing thread-safe Read (P2) and Write (P1) priority in concurrent Python applications.
upload_time2025-11-03 05:28:38
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseNone
keywords concurrency thread-safe priority locking queue strict
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            StrictFlow 🔒

A strict, thread-safe flow controller for Python concurrency, enforcing Read (P2) and Write (P1) priority.

💡 The Problem

In high-concurrency, multi-threaded applications (e.g., financial systems, industrial IoT, or server configurations), simultaneous reads and writes to a single piece of global state can lead to data corruption, inconsistent reads, or fatal race conditions. This is especially true for data that cannot fail or be read mid-update (e.g., blockchain transaction hashes, live configuration objects).

Standard Python locking mechanisms often ensure safety but lack priority and ordered execution.

✨ The StrictFlow Solution

StrictFlow implements a single-thread, priority-based task queue that provides exclusive access for critical operations, ensuring data integrity is never compromised.

It enforces two critical priority levels:

P1 Write (Critical): Tasks decorated with @write are the highest priority. When a P1 task is executed, all P2 Read tasks are paused and blocked until the write is complete and stable. This guarantees the writer has exclusive access.

P2 Read (High-Concurrency): Tasks decorated with @read can run highly concurrent logic internally (using asyncio), but they must wait synchronously for any active P1 Write task to finish.

This pattern ensures readers never see unstable or incomplete data.

📦 Installation

pip install strictflow


(Note: Once uploaded to PyPI, this command will work.)

🚀 Quick Start Example

This example demonstrates how a P1 Write task (updating config) blocks P2 Read tasks to prevent them from reading the old version (V0) or an unstable version (V1 mid-write).

import threading
import time
import asyncio
import logging
from strictflow import StrictFlow, read, write, setup_logging
from strictflow.core import logger # Use the internal logger for application logs

# 1. The user MUST activate logging to see output
setup_logging(logging.DEBUG) 

GLOBAL_CONFIG = {"version": 0, "status": "stable"}

def main():
    flow = StrictFlow()
    loop_thread = threading.Thread(target=flow.run_loop, daemon=True, name="FlowLoop")
    loop_thread.start()
    
    @write(flow)
    def update_config(new_version: int, new_status: str):
        """P1 Write: Simulates a critical, blocking update."""
        global GLOBAL_CONFIG
        print(f"   [P1] Working: Simulating 1.5s write for V{new_version}...")
        time.sleep(1.5) 
        
        GLOBAL_CONFIG['version'] = new_version
        GLOBAL_CONFIG['status'] = new_status
        print(f"   [P1] Complete: Config updated to V{new_version}.")

    
    @read(flow)
    async def fetch_data_async(reader_id: str):
        """P2 Read: Can run concurrently but MUST wait for P1."""
        for i in range(3):
            await asyncio.sleep(0.3) 
            current_config = GLOBAL_CONFIG.copy()
            print(f"   [P2] Reader {reader_id} read | V{current_config['version']} | Iteration {i+1}")
            
    # --- SIMULATION ---
    fetch_data_async(reader_id="R-A") # P2 task submitted
    
    time.sleep(0.5) 
    
    update_config(new_version=1, new_status="deploying") # P1 task submitted (Blocks R-A)

    fetch_data_async(reader_id="R-B") # P2 task submitted (Will also wait for P1)

    time.sleep(7.5) # Wait for all tasks to finish
    flow.stop_loop()
    loop_thread.join() 

if __name__ == "__main__":
    main()


🛠️ Key Concepts

1. Thread-Safe Execution

All tasks (P1 and P2) are submitted to and executed sequentially by a single, dedicated worker thread (SF-LOOP). This single-threaded execution context eliminates concurrency issues at the data access level.

2. Priority Control (P1 vs P2)

While all tasks execute in the dedicated thread, the P1/P2 mechanism controls when P2 tasks are allowed to begin. P2 tasks actively check a status flag set by the P1 writer. If the flag is clear, they block the SF-LOOP thread until the P1 task is complete.

3. Asynchronous Reads

The @read decorator allows the wrapped function to be async. When the P2 task is executed by the SF-LOOP thread, it runs the async function using asyncio.run(), allowing the P2 task to achieve high internal concurrency (multiple fast reads) without requiring locks on the shared data outside the loop.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "strictflow",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "concurrency, thread-safe, priority, locking, queue, strict",
    "author": null,
    "author_email": "chiro <chiro6466@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/3a/60/9f837682aedba499c8841f3dbceaadec8631eb1b9f1e54bde3ce31404239/strictflow-0.1.0.tar.gz",
    "platform": null,
    "description": "StrictFlow \ud83d\udd12\r\n\r\nA strict, thread-safe flow controller for Python concurrency, enforcing Read (P2) and Write (P1) priority.\r\n\r\n\ud83d\udca1 The Problem\r\n\r\nIn high-concurrency, multi-threaded applications (e.g., financial systems, industrial IoT, or server configurations), simultaneous reads and writes to a single piece of global state can lead to data corruption, inconsistent reads, or fatal race conditions. This is especially true for data that cannot fail or be read mid-update (e.g., blockchain transaction hashes, live configuration objects).\r\n\r\nStandard Python locking mechanisms often ensure safety but lack priority and ordered execution.\r\n\r\n\u2728 The StrictFlow Solution\r\n\r\nStrictFlow implements a single-thread, priority-based task queue that provides exclusive access for critical operations, ensuring data integrity is never compromised.\r\n\r\nIt enforces two critical priority levels:\r\n\r\nP1 Write (Critical): Tasks decorated with @write are the highest priority. When a P1 task is executed, all P2 Read tasks are paused and blocked until the write is complete and stable. This guarantees the writer has exclusive access.\r\n\r\nP2 Read (High-Concurrency): Tasks decorated with @read can run highly concurrent logic internally (using asyncio), but they must wait synchronously for any active P1 Write task to finish.\r\n\r\nThis pattern ensures readers never see unstable or incomplete data.\r\n\r\n\ud83d\udce6 Installation\r\n\r\npip install strictflow\r\n\r\n\r\n(Note: Once uploaded to PyPI, this command will work.)\r\n\r\n\ud83d\ude80 Quick Start Example\r\n\r\nThis example demonstrates how a P1 Write task (updating config) blocks P2 Read tasks to prevent them from reading the old version (V0) or an unstable version (V1 mid-write).\r\n\r\nimport threading\r\nimport time\r\nimport asyncio\r\nimport logging\r\nfrom strictflow import StrictFlow, read, write, setup_logging\r\nfrom strictflow.core import logger # Use the internal logger for application logs\r\n\r\n# 1. The user MUST activate logging to see output\r\nsetup_logging(logging.DEBUG) \r\n\r\nGLOBAL_CONFIG = {\"version\": 0, \"status\": \"stable\"}\r\n\r\ndef main():\r\n    flow = StrictFlow()\r\n    loop_thread = threading.Thread(target=flow.run_loop, daemon=True, name=\"FlowLoop\")\r\n    loop_thread.start()\r\n    \r\n    @write(flow)\r\n    def update_config(new_version: int, new_status: str):\r\n        \"\"\"P1 Write: Simulates a critical, blocking update.\"\"\"\r\n        global GLOBAL_CONFIG\r\n        print(f\"   [P1] Working: Simulating 1.5s write for V{new_version}...\")\r\n        time.sleep(1.5) \r\n        \r\n        GLOBAL_CONFIG['version'] = new_version\r\n        GLOBAL_CONFIG['status'] = new_status\r\n        print(f\"   [P1] Complete: Config updated to V{new_version}.\")\r\n\r\n    \r\n    @read(flow)\r\n    async def fetch_data_async(reader_id: str):\r\n        \"\"\"P2 Read: Can run concurrently but MUST wait for P1.\"\"\"\r\n        for i in range(3):\r\n            await asyncio.sleep(0.3) \r\n            current_config = GLOBAL_CONFIG.copy()\r\n            print(f\"   [P2] Reader {reader_id} read | V{current_config['version']} | Iteration {i+1}\")\r\n            \r\n    # --- SIMULATION ---\r\n    fetch_data_async(reader_id=\"R-A\") # P2 task submitted\r\n    \r\n    time.sleep(0.5) \r\n    \r\n    update_config(new_version=1, new_status=\"deploying\") # P1 task submitted (Blocks R-A)\r\n\r\n    fetch_data_async(reader_id=\"R-B\") # P2 task submitted (Will also wait for P1)\r\n\r\n    time.sleep(7.5) # Wait for all tasks to finish\r\n    flow.stop_loop()\r\n    loop_thread.join() \r\n\r\nif __name__ == \"__main__\":\r\n    main()\r\n\r\n\r\n\ud83d\udee0\ufe0f Key Concepts\r\n\r\n1. Thread-Safe Execution\r\n\r\nAll tasks (P1 and P2) are submitted to and executed sequentially by a single, dedicated worker thread (SF-LOOP). This single-threaded execution context eliminates concurrency issues at the data access level.\r\n\r\n2. Priority Control (P1 vs P2)\r\n\r\nWhile all tasks execute in the dedicated thread, the P1/P2 mechanism controls when P2 tasks are allowed to begin. P2 tasks actively check a status flag set by the P1 writer. If the flag is clear, they block the SF-LOOP thread until the P1 task is complete.\r\n\r\n3. Asynchronous Reads\r\n\r\nThe @read decorator allows the wrapped function to be async. When the P2 task is executed by the SF-LOOP thread, it runs the async function using asyncio.run(), allowing the P2 task to achieve high internal concurrency (multiple fast reads) without requiring locks on the shared data outside the loop.\r\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A lightweight, strict flow controller for enforcing thread-safe Read (P2) and Write (P1) priority in concurrent Python applications.",
    "version": "0.1.0",
    "project_urls": {
        "Homepage": "https://github.com/chiro6466/strictflow/blob/main/strictflow"
    },
    "split_keywords": [
        "concurrency",
        " thread-safe",
        " priority",
        " locking",
        " queue",
        " strict"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6bb8b9c825d3e9d4ad630af1b03244501c90a3483f9da91628e0137964f7246c",
                "md5": "bb08f4962c9a28aab6ac1fd8e510d47a",
                "sha256": "2285c33abe5e74f21386daa1be31d3928aa582fbbf765b0d38fb13bd513da61f"
            },
            "downloads": -1,
            "filename": "strictflow-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "bb08f4962c9a28aab6ac1fd8e510d47a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 9690,
            "upload_time": "2025-11-03T05:28:36",
            "upload_time_iso_8601": "2025-11-03T05:28:36.680676Z",
            "url": "https://files.pythonhosted.org/packages/6b/b8/b9c825d3e9d4ad630af1b03244501c90a3483f9da91628e0137964f7246c/strictflow-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3a609f837682aedba499c8841f3dbceaadec8631eb1b9f1e54bde3ce31404239",
                "md5": "405f6310a71aee35b2fc71f7a766f77b",
                "sha256": "6771e84060a0db49ed372fa21ce4256b6cb1c81bed0b8d3d204bc277f0c6f46e"
            },
            "downloads": -1,
            "filename": "strictflow-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "405f6310a71aee35b2fc71f7a766f77b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 10279,
            "upload_time": "2025-11-03T05:28:38",
            "upload_time_iso_8601": "2025-11-03T05:28:38.069242Z",
            "url": "https://files.pythonhosted.org/packages/3a/60/9f837682aedba499c8841f3dbceaadec8631eb1b9f1e54bde3ce31404239/strictflow-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-11-03 05:28:38",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "chiro6466",
    "github_project": "strictflow",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "strictflow"
}
        
Elapsed time: 0.67250s