| Name | san7 JSON |
| Version |
1.1.0
JSON |
| download |
| home_page | None |
| Summary | Advanced time-based expiration and protection system for Python applications |
| upload_time | 2025-11-09 10:22:22 |
| maintainer | None |
| docs_url | None |
| author | san |
| requires_python | >=3.7 |
| license | MIT |
| keywords |
expiration
time-based
security
protection
licensing
|
| VCS |
|
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
No coveralls.
|
# s7
Advanced time-based expiration and protection system for Python applications
Version: 1.0.0
Developer: san
Contact: @ii00hh
---
## Installation
```bash
pip install s7
```
---
## Basic Usage
```python
import s7
s7.set_expiration(2025, 12, 31)
s7.check_expiration()
```
```python
import s7
s7.set_expiration(2025, 6, 15, 14, 30, 0)
s7.check_expiration()
```
```python
from s7 import ExpireDate
expire = ExpireDate()
expire.set_date(2025, 12, 31, 23, 59, 59)
expire.check()
```
---
## Complete Examples
### Simple Tool Protection
```python
import s7
s7.set_expiration(2025, 12, 31)
def main():
s7.check_expiration()
print("Tool is running...")
main()
```
### CLI Tool
```python
import s7
import argparse
s7.set_expiration(2025, 12, 31)
def process_file(filename):
s7.check_expiration()
with open(filename, 'r') as f:
data = f.read()
return data
def main():
s7.check_expiration()
parser = argparse.ArgumentParser()
parser.add_argument('--input', required=True)
args = parser.parse_args()
result = process_file(args.input)
print(result)
if __name__ == "__main__":
main()
```
### Web Application
```python
from flask import Flask
import s7
app = Flask(__name__)
s7.set_expiration(2025, 12, 31)
@app.before_request
def check_validity():
s7.check_expiration()
@app.route('/')
def index():
return "Service Active"
@app.route('/api/data')
def get_data():
s7.check_expiration()
return {"status": "ok", "data": [1, 2, 3]}
if __name__ == '__main__':
app.run()
```
### GUI Application
```python
import s7
import tkinter as tk
class App(tk.Tk):
def __init__(self):
s7.set_expiration(2025, 12, 31)
s7.check_expiration()
super().__init__()
self.title("Protected App")
self.geometry("400x300")
self.label = tk.Label(self, text="Application Running")
self.label.pack(pady=20)
self.button = tk.Button(self, text="Execute", command=self.execute)
self.button.pack()
self.after(1000, self.periodic_check)
def execute(self):
s7.check_expiration()
self.label.config(text="Task Executed")
def periodic_check(self):
s7.check_expiration()
self.after(1000, self.periodic_check)
if __name__ == "__main__":
app = App()
app.mainloop()
```
### Data Processing Script
```python
import s7
import pandas as pd
s7.set_expiration(2025, 12, 31)
def load_data(filepath):
s7.check_expiration()
return pd.read_csv(filepath)
def process_data(df):
s7.check_expiration()
df['new_column'] = df['value'] * 2
return df
def save_data(df, filepath):
s7.check_expiration()
df.to_csv(filepath, index=False)
def main():
s7.check_expiration()
data = load_data('input.csv')
processed = process_data(data)
save_data(processed, 'output.csv')
print("Processing complete")
if __name__ == "__main__":
main()
```
### API Client
```python
import s7
import requests
s7.set_expiration(2025, 12, 31)
class APIClient:
def __init__(self, base_url):
s7.check_expiration()
self.base_url = base_url
self.session = requests.Session()
def get(self, endpoint):
s7.check_expiration()
response = self.session.get(f"{self.base_url}{endpoint}")
return response.json()
def post(self, endpoint, data):
s7.check_expiration()
response = self.session.post(f"{self.base_url}{endpoint}", json=data)
return response.json()
client = APIClient("https://api.example.com")
data = client.get("/users")
result = client.post("/process", {"key": "value"})
```
### Bot Application
```python
import s7
import time
s7.set_expiration(2025, 12, 31)
class Bot:
def __init__(self):
s7.check_expiration()
self.running = True
def start(self):
s7.check_expiration()
print("Bot started")
while self.running:
s7.check_expiration()
self.process_messages()
time.sleep(1)
def process_messages(self):
s7.check_expiration()
pass
def stop(self):
s7.check_expiration()
self.running = False
bot = Bot()
bot.start()
```
### Automated Task
```python
import s7
import schedule
import time
s7.set_expiration(2025, 12, 31)
def job():
s7.check_expiration()
print("Executing scheduled task")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
while True:
s7.check_expiration()
schedule.run_pending()
time.sleep(1)
```
### Database Tool
```python
import s7
import sqlite3
s7.set_expiration(2025, 12, 31)
class Database:
def __init__(self, db_path):
s7.check_expiration()
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()
def query(self, sql, params=()):
s7.check_expiration()
self.cursor.execute(sql, params)
return self.cursor.fetchall()
def execute(self, sql, params=()):
s7.check_expiration()
self.cursor.execute(sql, params)
self.conn.commit()
def close(self):
s7.check_expiration()
self.conn.close()
db = Database('data.db')
results = db.query("SELECT * FROM users WHERE id = ?", (1,))
db.execute("INSERT INTO logs (message) VALUES (?)", ("Event logged",))
db.close()
```
### File Monitor
```python
import s7
import os
import time
s7.set_expiration(2025, 12, 31)
class FileMonitor:
def __init__(self, directory):
s7.check_expiration()
self.directory = directory
self.files = {}
def scan(self):
s7.check_expiration()
for filename in os.listdir(self.directory):
filepath = os.path.join(self.directory, filename)
mtime = os.path.getmtime(filepath)
if filename not in self.files:
print(f"New file: {filename}")
elif self.files[filename] != mtime:
print(f"Modified: {filename}")
self.files[filename] = mtime
def monitor(self):
while True:
s7.check_expiration()
self.scan()
time.sleep(5)
monitor = FileMonitor('/path/to/watch')
monitor.monitor()
```
### API Server
```python
from fastapi import FastAPI
import s7
app = FastAPI()
s7.set_expiration(2025, 12, 31)
@app.middleware("http")
async def check_expiration_middleware(request, call_next):
s7.check_expiration()
response = await call_next(request)
return response
@app.get("/")
def read_root():
s7.check_expiration()
return {"status": "running"}
@app.get("/items/{item_id}")
def read_item(item_id: int):
s7.check_expiration()
return {"item_id": item_id, "name": f"Item {item_id}"}
@app.post("/process")
def process_data(data: dict):
s7.check_expiration()
result = {"processed": True, "data": data}
return result
```
### Scraper
```python
import s7
from bs4 import BeautifulSoup
import requests
s7.set_expiration(2025, 12, 31)
class Scraper:
def __init__(self):
s7.check_expiration()
self.session = requests.Session()
def fetch(self, url):
s7.check_expiration()
response = self.session.get(url)
return response.text
def parse(self, html):
s7.check_expiration()
soup = BeautifulSoup(html, 'html.parser')
data = []
for item in soup.find_all('div', class_='item'):
s7.check_expiration()
title = item.find('h2').text
data.append({'title': title})
return data
def scrape(self, url):
s7.check_expiration()
html = self.fetch(url)
data = self.parse(html)
return data
scraper = Scraper()
results = scraper.scrape("https://example.com")
```
### Background Worker
```python
import s7
import threading
import queue
s7.set_expiration(2025, 12, 31)
class Worker:
def __init__(self):
s7.check_expiration()
self.queue = queue.Queue()
self.running = True
def add_task(self, task):
s7.check_expiration()
self.queue.put(task)
def process(self):
while self.running:
s7.check_expiration()
try:
task = self.queue.get(timeout=1)
self.execute(task)
self.queue.task_done()
except queue.Empty:
continue
def execute(self, task):
s7.check_expiration()
print(f"Executing: {task}")
def start(self):
s7.check_expiration()
thread = threading.Thread(target=self.process)
thread.start()
worker = Worker()
worker.start()
worker.add_task("Task 1")
worker.add_task("Task 2")
```
### Image Processor
```python
import s7
from PIL import Image
import os
s7.set_expiration(2025, 12, 31)
class ImageProcessor:
def __init__(self, input_dir, output_dir):
s7.check_expiration()
self.input_dir = input_dir
self.output_dir = output_dir
def resize(self, image_path, size=(800, 600)):
s7.check_expiration()
img = Image.open(image_path)
img = img.resize(size)
return img
def process_all(self):
s7.check_expiration()
for filename in os.listdir(self.input_dir):
s7.check_expiration()
if filename.endswith(('.png', '.jpg', '.jpeg')):
input_path = os.path.join(self.input_dir, filename)
output_path = os.path.join(self.output_dir, filename)
img = self.resize(input_path)
img.save(output_path)
print(f"Processed: {filename}")
processor = ImageProcessor('input', 'output')
processor.process_all()
```
### Email Tool
```python
import s7
import smtplib
from email.mime.text import MIMEText
s7.set_expiration(2025, 12, 31)
class EmailSender:
def __init__(self, smtp_server, port, username, password):
s7.check_expiration()
self.smtp_server = smtp_server
self.port = port
self.username = username
self.password = password
def send(self, to, subject, body):
s7.check_expiration()
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.username
msg['To'] = to
with smtplib.SMTP(self.smtp_server, self.port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
sender = EmailSender('smtp.gmail.com', 587, 'user@example.com', 'password')
sender.send('recipient@example.com', 'Subject', 'Message body')
```
### Logger System
```python
import s7
import logging
from datetime import datetime
s7.set_expiration(2025, 12, 31)
class Logger:
def __init__(self, log_file):
s7.check_expiration()
self.log_file = log_file
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def info(self, message):
s7.check_expiration()
logging.info(message)
def error(self, message):
s7.check_expiration()
logging.error(message)
def warning(self, message):
s7.check_expiration()
logging.warning(message)
logger = Logger('app.log')
logger.info("Application started")
logger.error("Error occurred")
logger.warning("Warning message")
```
### Cache System
```python
import s7
import time
import pickle
s7.set_expiration(2025, 12, 31)
class Cache:
def __init__(self, ttl=3600):
s7.check_expiration()
self.cache = {}
self.ttl = ttl
def set(self, key, value):
s7.check_expiration()
self.cache[key] = {
'value': value,
'timestamp': time.time()
}
def get(self, key):
s7.check_expiration()
if key not in self.cache:
return None
item = self.cache[key]
if time.time() - item['timestamp'] > self.ttl:
del self.cache[key]
return None
return item['value']
def clear(self):
s7.check_expiration()
self.cache.clear()
cache = Cache(ttl=300)
cache.set('user:1', {'name': 'John', 'age': 30})
user = cache.get('user:1')
```
### Config Manager
```python
import s7
import json
s7.set_expiration(2025, 12, 31)
class Config:
def __init__(self, config_file):
s7.check_expiration()
self.config_file = config_file
self.data = {}
self.load()
def load(self):
s7.check_expiration()
try:
with open(self.config_file, 'r') as f:
self.data = json.load(f)
except FileNotFoundError:
self.data = {}
def save(self):
s7.check_expiration()
with open(self.config_file, 'w') as f:
json.dump(self.data, f, indent=2)
def get(self, key, default=None):
s7.check_expiration()
return self.data.get(key, default)
def set(self, key, value):
s7.check_expiration()
self.data[key] = value
self.save()
config = Config('config.json')
config.set('api_key', 'abc123')
api_key = config.get('api_key')
```
### Report Generator
```python
import s7
from datetime import datetime
s7.set_expiration(2025, 12, 31)
class ReportGenerator:
def __init__(self):
s7.check_expiration()
self.data = []
def add_entry(self, entry):
s7.check_expiration()
self.data.append({
'timestamp': datetime.now().isoformat(),
'entry': entry
})
def generate(self, output_file):
s7.check_expiration()
with open(output_file, 'w') as f:
f.write("REPORT\n")
f.write("=" * 50 + "\n\n")
for item in self.data:
s7.check_expiration()
f.write(f"{item['timestamp']}: {item['entry']}\n")
print(f"Report generated: {output_file}")
report = ReportGenerator()
report.add_entry("System started")
report.add_entry("Processing complete")
report.generate("report.txt")
```
### Notification System
```python
import s7
import requests
s7.set_expiration(2025, 12, 31)
class Notifier:
def __init__(self, webhook_url):
s7.check_expiration()
self.webhook_url = webhook_url
def send(self, message, level='info'):
s7.check_expiration()
payload = {
'text': message,
'level': level
}
response = requests.post(self.webhook_url, json=payload)
return response.status_code == 200
def info(self, message):
s7.check_expiration()
self.send(message, 'info')
def alert(self, message):
s7.check_expiration()
self.send(message, 'alert')
notifier = Notifier('https://hooks.example.com/webhook')
notifier.info("Task completed")
notifier.alert("Error detected")
```
### Command Executor
```python
import s7
import subprocess
s7.set_expiration(2025, 12, 31)
class CommandExecutor:
def __init__(self):
s7.check_expiration()
self.history = []
def run(self, command):
s7.check_expiration()
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True
)
self.history.append({
'command': command,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
})
return result
def get_history(self):
s7.check_expiration()
return self.history
executor = CommandExecutor()
result = executor.run('ls -la')
print(result.stdout)
```
### Data Validator
```python
import s7
import re
s7.set_expiration(2025, 12, 31)
class Validator:
def __init__(self):
s7.check_expiration()
pass
def email(self, value):
s7.check_expiration()
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, value))
def phone(self, value):
s7.check_expiration()
pattern = r'^\+?1?\d{9,15}$'
return bool(re.match(pattern, value))
def url(self, value):
s7.check_expiration()
pattern = r'^https?://[^\s]+$'
return bool(re.match(pattern, value))
def validate_dict(self, data, rules):
s7.check_expiration()
errors = {}
for field, rule in rules.items():
s7.check_expiration()
if field not in data:
errors[field] = "Required"
elif rule == 'email' and not self.email(data[field]):
errors[field] = "Invalid email"
return errors
validator = Validator()
print(validator.email("test@example.com"))
print(validator.phone("+1234567890"))
errors = validator.validate_dict(
{'email': 'invalid'},
{'email': 'email', 'phone': 'phone'}
)
```
### Batch Processor
```python
import s7
import time
s7.set_expiration(2025, 12, 31)
class BatchProcessor:
def __init__(self, batch_size=100):
s7.check_expiration()
self.batch_size = batch_size
def process(self, items, process_func):
s7.check_expiration()
results = []
for i in range(0, len(items), self.batch_size):
s7.check_expiration()
batch = items[i:i + self.batch_size]
for item in batch:
s7.check_expiration()
result = process_func(item)
results.append(result)
time.sleep(0.1)
return results
def process_item(item):
return item * 2
processor = BatchProcessor(batch_size=50)
items = list(range(1000))
results = processor.process(items, process_item)
```
### State Machine
```python
import s7
s7.set_expiration(2025, 12, 31)
class StateMachine:
def __init__(self, initial_state):
s7.check_expiration()
self.state = initial_state
self.transitions = {}
def add_transition(self, from_state, to_state, condition):
s7.check_expiration()
if from_state not in self.transitions:
self.transitions[from_state] = []
self.transitions[from_state].append((to_state, condition))
def transition(self, event):
s7.check_expiration()
if self.state in self.transitions:
for to_state, condition in self.transitions[self.state]:
if condition(event):
self.state = to_state
return True
return False
def get_state(self):
s7.check_expiration()
return self.state
sm = StateMachine('idle')
sm.add_transition('idle', 'running', lambda e: e == 'start')
sm.add_transition('running', 'idle', lambda e: e == 'stop')
sm.transition('start')
print(sm.get_state())
```
### Task Queue
```python
import s7
import queue
import threading
s7.set_expiration(2025, 12, 31)
class TaskQueue:
def __init__(self, num_workers=4):
s7.check_expiration()
self.queue = queue.Queue()
self.workers = []
for i in range(num_workers):
s7.check_expiration()
worker = threading.Thread(target=self.worker)
worker.daemon = True
worker.start()
self.workers.append(worker)
def worker(self):
while True:
s7.check_expiration()
task = self.queue.get()
if task is None:
break
func, args = task
func(*args)
self.queue.task_done()
def add_task(self, func, *args):
s7.check_expiration()
self.queue.put((func, args))
def wait(self):
s7.check_expiration()
self.queue.join()
def process_task(task_id):
print(f"Processing task {task_id}")
tq = TaskQueue(num_workers=2)
for i in range(10):
tq.add_task(process_task, i)
tq.wait()
```
### Plugin System
```python
import s7
import importlib
s7.set_expiration(2025, 12, 31)
class PluginManager:
def __init__(self):
s7.check_expiration()
self.plugins = {}
def load(self, plugin_name):
s7.check_expiration()
module = importlib.import_module(plugin_name)
plugin = module.Plugin()
self.plugins[plugin_name] = plugin
return plugin
def execute(self, plugin_name, method, *args):
s7.check_expiration()
if plugin_name in self.plugins:
plugin = self.plugins[plugin_name]
return getattr(plugin, method)(*args)
def list_plugins(self):
s7.check_expiration()
return list(self.plugins.keys())
manager = PluginManager()
```
### Event System
```python
import s7
s7.set_expiration(2025, 12, 31)
class EventEmitter:
def __init__(self):
s7.check_expiration()
self.listeners = {}
def on(self, event, callback):
s7.check_expiration()
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
def emit(self, event, *args):
s7.check_expiration()
if event in self.listeners:
for callback in self.listeners[event]:
s7.check_expiration()
callback(*args)
def off(self, event, callback):
s7.check_expiration()
if event in self.listeners:
self.listeners[event].remove(callback)
emitter = EventEmitter()
def on_data(data):
print(f"Received: {data}")
emitter.on('data', on_data)
emitter.emit('data', 'test')
```
---
## Advanced Class Usage
```python
from s7 import ExpireDate
expire = ExpireDate()
expire.set_date(2025, 12, 31, 23, 59, 59)
while True:
if not expire.check():
break
print("Running...")
```
```python
from s7 import ExpireDate, ProtectionSystem
expire = ExpireDate()
expire.set_date(2026, 1, 1)
protection = ProtectionSystem()
for i in range(100):
expire.check()
print(f"Iteration {i}")
```
```python
from s7 import TimeValidator
validator = TimeValidator()
if validator.validate_system_time():
timestamp = validator.get_secure_timestamp()
print(f"Secure timestamp: {timestamp}")
```
```python
from s7.net import NetworkTimeValidator
ntv = NetworkTimeValidator()
network_time = ntv.get_network_time()
local_time = int(time.time())
if ntv.validate_local_time(local_time):
print("Time is valid")
```
```python
from s7.mem import MemoryProtection
mem = MemoryProtection()
mem.protect('secret_key', 'my_secret_value')
if mem.verify('secret_key'):
value = mem.get('secret_key')
print(f"Protected value: {value}")
```
---
## Multiple Checks Pattern
```python
import s7
s7.set_expiration(2025, 12, 31)
def critical_operation():
s7.check_expiration()
result = process_data()
s7.check_expiration()
save_result(result)
s7.check_expiration()
return result
```
```python
import s7
s7.set_expiration(2025, 12, 31)
class Application:
def __init__(self):
s7.check_expiration()
self.data = []
def add(self, item):
s7.check_expiration()
self.data.append(item)
def process(self):
s7.check_expiration()
for item in self.data:
s7.check_expiration()
print(item)
def save(self):
s7.check_expiration()
with open('data.txt', 'w') as f:
for item in self.data:
s7.check_expiration()
f.write(f"{item}\n")
```
---
## Integration Patterns
```python
import s7
from functools import wraps
s7.set_expiration(2025, 12, 31)
def protected(func):
@wraps(func)
def wrapper(*args, **kwargs):
s7.check_expiration()
return func(*args, **kwargs)
return wrapper
@protected
def my_function():
print("Function executed")
@protected
def another_function(x, y):
return x + y
```
```python
import s7
s7.set_expiration(2025, 12, 31)
class ProtectedClass:
def __init__(self):
s7.check_expiration()
def __enter__(self):
s7.check_expiration()
return self
def __exit__(self, *args):
s7.check_expiration()
def execute(self):
s7.check_expiration()
print("Executing")
with ProtectedClass() as pc:
pc.execute()
```
---
## Complete Application Examples
### Complete REST API
```python
from flask import Flask, request, jsonify
import s7
import sqlite3
app = Flask(__name__)
s7.set_expiration(2025, 12, 31)
class Database:
def __init__(self):
s7.check_expiration()
self.conn = sqlite3.connect('app.db', check_same_thread=False)
self.init_db()
def init_db(self):
s7.check_expiration()
self.conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
''')
self.conn.commit()
def get_all(self):
s7.check_expiration()
cursor = self.conn.execute('SELECT * FROM users')
return cursor.fetchall()
def get_by_id(self, user_id):
s7.check_expiration()
cursor = self.conn.execute('SELECT * FROM users WHERE id = ?', (user_id,))
return cursor.fetchone()
def create(self, name, email):
s7.check_expiration()
self.conn.execute('INSERT INTO users (name, email) VALUES (?, ?)', (name, email))
self.conn.commit()
db = Database()
@app.before_request
def check_expiration_middleware():
s7.check_expiration()
@app.route('/api/users', methods=['GET'])
def get_users():
s7.check_expiration()
users = db.get_all()
return jsonify({'users': users})
@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
s7.check_expiration()
user = db.get_by_id(user_id)
if user:
return jsonify({'user': user})
return jsonify({'error': 'Not found'}), 404
@app.route('/api/users', methods=['POST'])
def create_user():
s7.check_expiration()
data = request.json
db.create(data['name'], data['email'])
return jsonify({'status': 'created'}), 201
if __name__ == '__main__':
app.run(debug=True)
```
### Complete CLI Application
```python
import s7
import click
import json
s7.set_expiration(2025, 12, 31)
class TodoManager:
def __init__(self, filename='todos.json'):
s7.check_expiration()
self.filename = filename
self.todos = self.load()
def load(self):
s7.check_expiration()
try:
with open(self.filename, 'r') as f:
return json.load(f)
except FileNotFoundError:
return []
def save(self):
s7.check_expiration()
with open(self.filename, 'w') as f:
json.dump(self.todos, f, indent=2)
def add(self, task):
s7.check_expiration()
self.todos.append({'task': task, 'done': False})
self.save()
def list(self):
s7.check_expiration()
return self.todos
def complete(self, index):
s7.check_expiration()
if 0 <= index < len(self.todos):
self.todos[index]['done'] = True
self.save()
manager = TodoManager()
@click.group()
def cli():
s7.check_expiration()
pass
@cli.command()
@click.argument('task')
def add(task):
s7.check_expiration()
manager.add(task)
click.echo(f'Added: {task}')
@cli.command()
def list():
s7.check_expiration()
todos = manager.list()
for i, todo in enumerate(todos):
status = '✓' if todo['done'] else ' '
click.echo(f'{i}. [{status}] {todo["task"]}')
@cli.command()
@click.argument('index', type=int)
def done(index):
s7.check_expiration()
manager.complete(index)
click.echo(f'Completed task {index}')
if __name__ == '__main__':
cli()
```
### Complete Service Application
```python
import s7
import time
import logging
from threading import Thread
s7.set_expiration(2025, 12, 31)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Service:
def __init__(self):
s7.check_expiration()
self.running = False
self.threads = []
def start(self):
s7.check_expiration()
logger.info("Starting service")
self.running = True
self.threads.append(Thread(target=self.monitor_loop))
self.threads.append(Thread(target=self.worker_loop))
for thread in self.threads:
thread.start()
def stop(self):
s7.check_expiration()
logger.info("Stopping service")
self.running = False
for thread in self.threads:
thread.join()
def monitor_loop(self):
while self.running:
s7.check_expiration()
logger.info("Monitor tick")
time.sleep(10)
def worker_loop(self):
while self.running:
s7.check_expiration()
self.process_task()
time.sleep(5)
def process_task(self):
s7.check_expiration()
logger.info("Processing task")
if __name__ == '__main__':
service = Service()
try:
service.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
service.stop()
```
---
## Package Information
```python
import s7
print(s7.__version__)
print(s7.__author__)
print(s7.__telegram__)
```
```python
from s7 import __all__
print(__all__)
```
---
## License
MIT License
Copyright (c) 2025 san
Contact: @ii00hh on Telegram
---
## Build & Publish
```bash
pip install build twine
python -m build
twine check dist/*
twine upload dist/*
```
---
Raw data
{
"_id": null,
"home_page": null,
"name": "san7",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": null,
"keywords": "expiration, time-based, security, protection, licensing",
"author": "san",
"author_email": null,
"download_url": null,
"platform": null,
"description": "# s7\r\n\r\nAdvanced time-based expiration and protection system for Python applications\r\n\r\nVersion: 1.0.0 \r\nDeveloper: san \r\nContact: @ii00hh\r\n\r\n---\r\n\r\n## Installation\r\n\r\n```bash\r\npip install s7\r\n```\r\n\r\n---\r\n\r\n## Basic Usage\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 12, 31)\r\ns7.check_expiration()\r\n```\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 6, 15, 14, 30, 0)\r\ns7.check_expiration()\r\n```\r\n\r\n```python\r\nfrom s7 import ExpireDate\r\n\r\nexpire = ExpireDate()\r\nexpire.set_date(2025, 12, 31, 23, 59, 59)\r\nexpire.check()\r\n```\r\n\r\n---\r\n\r\n## Complete Examples\r\n\r\n### Simple Tool Protection\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\ndef main():\r\n s7.check_expiration()\r\n print(\"Tool is running...\")\r\n \r\nmain()\r\n```\r\n\r\n### CLI Tool\r\n\r\n```python\r\nimport s7\r\nimport argparse\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\ndef process_file(filename):\r\n s7.check_expiration()\r\n with open(filename, 'r') as f:\r\n data = f.read()\r\n return data\r\n\r\ndef main():\r\n s7.check_expiration()\r\n parser = argparse.ArgumentParser()\r\n parser.add_argument('--input', required=True)\r\n args = parser.parse_args()\r\n \r\n result = process_file(args.input)\r\n print(result)\r\n\r\nif __name__ == \"__main__\":\r\n main()\r\n```\r\n\r\n### Web Application\r\n\r\n```python\r\nfrom flask import Flask\r\nimport s7\r\n\r\napp = Flask(__name__)\r\ns7.set_expiration(2025, 12, 31)\r\n\r\n@app.before_request\r\ndef check_validity():\r\n s7.check_expiration()\r\n\r\n@app.route('/')\r\ndef index():\r\n return \"Service Active\"\r\n\r\n@app.route('/api/data')\r\ndef get_data():\r\n s7.check_expiration()\r\n return {\"status\": \"ok\", \"data\": [1, 2, 3]}\r\n\r\nif __name__ == '__main__':\r\n app.run()\r\n```\r\n\r\n### GUI Application\r\n\r\n```python\r\nimport s7\r\nimport tkinter as tk\r\n\r\nclass App(tk.Tk):\r\n def __init__(self):\r\n s7.set_expiration(2025, 12, 31)\r\n s7.check_expiration()\r\n \r\n super().__init__()\r\n self.title(\"Protected App\")\r\n self.geometry(\"400x300\")\r\n \r\n self.label = tk.Label(self, text=\"Application Running\")\r\n self.label.pack(pady=20)\r\n \r\n self.button = tk.Button(self, text=\"Execute\", command=self.execute)\r\n self.button.pack()\r\n \r\n self.after(1000, self.periodic_check)\r\n \r\n def execute(self):\r\n s7.check_expiration()\r\n self.label.config(text=\"Task Executed\")\r\n \r\n def periodic_check(self):\r\n s7.check_expiration()\r\n self.after(1000, self.periodic_check)\r\n\r\nif __name__ == \"__main__\":\r\n app = App()\r\n app.mainloop()\r\n```\r\n\r\n### Data Processing Script\r\n\r\n```python\r\nimport s7\r\nimport pandas as pd\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\ndef load_data(filepath):\r\n s7.check_expiration()\r\n return pd.read_csv(filepath)\r\n\r\ndef process_data(df):\r\n s7.check_expiration()\r\n df['new_column'] = df['value'] * 2\r\n return df\r\n\r\ndef save_data(df, filepath):\r\n s7.check_expiration()\r\n df.to_csv(filepath, index=False)\r\n\r\ndef main():\r\n s7.check_expiration()\r\n \r\n data = load_data('input.csv')\r\n processed = process_data(data)\r\n save_data(processed, 'output.csv')\r\n \r\n print(\"Processing complete\")\r\n\r\nif __name__ == \"__main__\":\r\n main()\r\n```\r\n\r\n### API Client\r\n\r\n```python\r\nimport s7\r\nimport requests\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass APIClient:\r\n def __init__(self, base_url):\r\n s7.check_expiration()\r\n self.base_url = base_url\r\n self.session = requests.Session()\r\n \r\n def get(self, endpoint):\r\n s7.check_expiration()\r\n response = self.session.get(f\"{self.base_url}{endpoint}\")\r\n return response.json()\r\n \r\n def post(self, endpoint, data):\r\n s7.check_expiration()\r\n response = self.session.post(f\"{self.base_url}{endpoint}\", json=data)\r\n return response.json()\r\n\r\nclient = APIClient(\"https://api.example.com\")\r\ndata = client.get(\"/users\")\r\nresult = client.post(\"/process\", {\"key\": \"value\"})\r\n```\r\n\r\n### Bot Application\r\n\r\n```python\r\nimport s7\r\nimport time\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Bot:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.running = True\r\n \r\n def start(self):\r\n s7.check_expiration()\r\n print(\"Bot started\")\r\n \r\n while self.running:\r\n s7.check_expiration()\r\n self.process_messages()\r\n time.sleep(1)\r\n \r\n def process_messages(self):\r\n s7.check_expiration()\r\n pass\r\n \r\n def stop(self):\r\n s7.check_expiration()\r\n self.running = False\r\n\r\nbot = Bot()\r\nbot.start()\r\n```\r\n\r\n### Automated Task\r\n\r\n```python\r\nimport s7\r\nimport schedule\r\nimport time\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\ndef job():\r\n s7.check_expiration()\r\n print(\"Executing scheduled task\")\r\n\r\nschedule.every(10).minutes.do(job)\r\nschedule.every().hour.do(job)\r\nschedule.every().day.at(\"10:30\").do(job)\r\n\r\nwhile True:\r\n s7.check_expiration()\r\n schedule.run_pending()\r\n time.sleep(1)\r\n```\r\n\r\n### Database Tool\r\n\r\n```python\r\nimport s7\r\nimport sqlite3\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Database:\r\n def __init__(self, db_path):\r\n s7.check_expiration()\r\n self.conn = sqlite3.connect(db_path)\r\n self.cursor = self.conn.cursor()\r\n \r\n def query(self, sql, params=()):\r\n s7.check_expiration()\r\n self.cursor.execute(sql, params)\r\n return self.cursor.fetchall()\r\n \r\n def execute(self, sql, params=()):\r\n s7.check_expiration()\r\n self.cursor.execute(sql, params)\r\n self.conn.commit()\r\n \r\n def close(self):\r\n s7.check_expiration()\r\n self.conn.close()\r\n\r\ndb = Database('data.db')\r\nresults = db.query(\"SELECT * FROM users WHERE id = ?\", (1,))\r\ndb.execute(\"INSERT INTO logs (message) VALUES (?)\", (\"Event logged\",))\r\ndb.close()\r\n```\r\n\r\n### File Monitor\r\n\r\n```python\r\nimport s7\r\nimport os\r\nimport time\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass FileMonitor:\r\n def __init__(self, directory):\r\n s7.check_expiration()\r\n self.directory = directory\r\n self.files = {}\r\n \r\n def scan(self):\r\n s7.check_expiration()\r\n for filename in os.listdir(self.directory):\r\n filepath = os.path.join(self.directory, filename)\r\n mtime = os.path.getmtime(filepath)\r\n \r\n if filename not in self.files:\r\n print(f\"New file: {filename}\")\r\n elif self.files[filename] != mtime:\r\n print(f\"Modified: {filename}\")\r\n \r\n self.files[filename] = mtime\r\n \r\n def monitor(self):\r\n while True:\r\n s7.check_expiration()\r\n self.scan()\r\n time.sleep(5)\r\n\r\nmonitor = FileMonitor('/path/to/watch')\r\nmonitor.monitor()\r\n```\r\n\r\n### API Server\r\n\r\n```python\r\nfrom fastapi import FastAPI\r\nimport s7\r\n\r\napp = FastAPI()\r\ns7.set_expiration(2025, 12, 31)\r\n\r\n@app.middleware(\"http\")\r\nasync def check_expiration_middleware(request, call_next):\r\n s7.check_expiration()\r\n response = await call_next(request)\r\n return response\r\n\r\n@app.get(\"/\")\r\ndef read_root():\r\n s7.check_expiration()\r\n return {\"status\": \"running\"}\r\n\r\n@app.get(\"/items/{item_id}\")\r\ndef read_item(item_id: int):\r\n s7.check_expiration()\r\n return {\"item_id\": item_id, \"name\": f\"Item {item_id}\"}\r\n\r\n@app.post(\"/process\")\r\ndef process_data(data: dict):\r\n s7.check_expiration()\r\n result = {\"processed\": True, \"data\": data}\r\n return result\r\n```\r\n\r\n### Scraper\r\n\r\n```python\r\nimport s7\r\nfrom bs4 import BeautifulSoup\r\nimport requests\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Scraper:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.session = requests.Session()\r\n \r\n def fetch(self, url):\r\n s7.check_expiration()\r\n response = self.session.get(url)\r\n return response.text\r\n \r\n def parse(self, html):\r\n s7.check_expiration()\r\n soup = BeautifulSoup(html, 'html.parser')\r\n data = []\r\n \r\n for item in soup.find_all('div', class_='item'):\r\n s7.check_expiration()\r\n title = item.find('h2').text\r\n data.append({'title': title})\r\n \r\n return data\r\n \r\n def scrape(self, url):\r\n s7.check_expiration()\r\n html = self.fetch(url)\r\n data = self.parse(html)\r\n return data\r\n\r\nscraper = Scraper()\r\nresults = scraper.scrape(\"https://example.com\")\r\n```\r\n\r\n### Background Worker\r\n\r\n```python\r\nimport s7\r\nimport threading\r\nimport queue\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Worker:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.queue = queue.Queue()\r\n self.running = True\r\n \r\n def add_task(self, task):\r\n s7.check_expiration()\r\n self.queue.put(task)\r\n \r\n def process(self):\r\n while self.running:\r\n s7.check_expiration()\r\n try:\r\n task = self.queue.get(timeout=1)\r\n self.execute(task)\r\n self.queue.task_done()\r\n except queue.Empty:\r\n continue\r\n \r\n def execute(self, task):\r\n s7.check_expiration()\r\n print(f\"Executing: {task}\")\r\n \r\n def start(self):\r\n s7.check_expiration()\r\n thread = threading.Thread(target=self.process)\r\n thread.start()\r\n\r\nworker = Worker()\r\nworker.start()\r\nworker.add_task(\"Task 1\")\r\nworker.add_task(\"Task 2\")\r\n```\r\n\r\n### Image Processor\r\n\r\n```python\r\nimport s7\r\nfrom PIL import Image\r\nimport os\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass ImageProcessor:\r\n def __init__(self, input_dir, output_dir):\r\n s7.check_expiration()\r\n self.input_dir = input_dir\r\n self.output_dir = output_dir\r\n \r\n def resize(self, image_path, size=(800, 600)):\r\n s7.check_expiration()\r\n img = Image.open(image_path)\r\n img = img.resize(size)\r\n return img\r\n \r\n def process_all(self):\r\n s7.check_expiration()\r\n for filename in os.listdir(self.input_dir):\r\n s7.check_expiration()\r\n if filename.endswith(('.png', '.jpg', '.jpeg')):\r\n input_path = os.path.join(self.input_dir, filename)\r\n output_path = os.path.join(self.output_dir, filename)\r\n \r\n img = self.resize(input_path)\r\n img.save(output_path)\r\n print(f\"Processed: {filename}\")\r\n\r\nprocessor = ImageProcessor('input', 'output')\r\nprocessor.process_all()\r\n```\r\n\r\n### Email Tool\r\n\r\n```python\r\nimport s7\r\nimport smtplib\r\nfrom email.mime.text import MIMEText\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass EmailSender:\r\n def __init__(self, smtp_server, port, username, password):\r\n s7.check_expiration()\r\n self.smtp_server = smtp_server\r\n self.port = port\r\n self.username = username\r\n self.password = password\r\n \r\n def send(self, to, subject, body):\r\n s7.check_expiration()\r\n msg = MIMEText(body)\r\n msg['Subject'] = subject\r\n msg['From'] = self.username\r\n msg['To'] = to\r\n \r\n with smtplib.SMTP(self.smtp_server, self.port) as server:\r\n server.starttls()\r\n server.login(self.username, self.password)\r\n server.send_message(msg)\r\n\r\nsender = EmailSender('smtp.gmail.com', 587, 'user@example.com', 'password')\r\nsender.send('recipient@example.com', 'Subject', 'Message body')\r\n```\r\n\r\n### Logger System\r\n\r\n```python\r\nimport s7\r\nimport logging\r\nfrom datetime import datetime\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Logger:\r\n def __init__(self, log_file):\r\n s7.check_expiration()\r\n self.log_file = log_file\r\n logging.basicConfig(\r\n filename=log_file,\r\n level=logging.INFO,\r\n format='%(asctime)s - %(levelname)s - %(message)s'\r\n )\r\n \r\n def info(self, message):\r\n s7.check_expiration()\r\n logging.info(message)\r\n \r\n def error(self, message):\r\n s7.check_expiration()\r\n logging.error(message)\r\n \r\n def warning(self, message):\r\n s7.check_expiration()\r\n logging.warning(message)\r\n\r\nlogger = Logger('app.log')\r\nlogger.info(\"Application started\")\r\nlogger.error(\"Error occurred\")\r\nlogger.warning(\"Warning message\")\r\n```\r\n\r\n### Cache System\r\n\r\n```python\r\nimport s7\r\nimport time\r\nimport pickle\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Cache:\r\n def __init__(self, ttl=3600):\r\n s7.check_expiration()\r\n self.cache = {}\r\n self.ttl = ttl\r\n \r\n def set(self, key, value):\r\n s7.check_expiration()\r\n self.cache[key] = {\r\n 'value': value,\r\n 'timestamp': time.time()\r\n }\r\n \r\n def get(self, key):\r\n s7.check_expiration()\r\n if key not in self.cache:\r\n return None\r\n \r\n item = self.cache[key]\r\n if time.time() - item['timestamp'] > self.ttl:\r\n del self.cache[key]\r\n return None\r\n \r\n return item['value']\r\n \r\n def clear(self):\r\n s7.check_expiration()\r\n self.cache.clear()\r\n\r\ncache = Cache(ttl=300)\r\ncache.set('user:1', {'name': 'John', 'age': 30})\r\nuser = cache.get('user:1')\r\n```\r\n\r\n### Config Manager\r\n\r\n```python\r\nimport s7\r\nimport json\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Config:\r\n def __init__(self, config_file):\r\n s7.check_expiration()\r\n self.config_file = config_file\r\n self.data = {}\r\n self.load()\r\n \r\n def load(self):\r\n s7.check_expiration()\r\n try:\r\n with open(self.config_file, 'r') as f:\r\n self.data = json.load(f)\r\n except FileNotFoundError:\r\n self.data = {}\r\n \r\n def save(self):\r\n s7.check_expiration()\r\n with open(self.config_file, 'w') as f:\r\n json.dump(self.data, f, indent=2)\r\n \r\n def get(self, key, default=None):\r\n s7.check_expiration()\r\n return self.data.get(key, default)\r\n \r\n def set(self, key, value):\r\n s7.check_expiration()\r\n self.data[key] = value\r\n self.save()\r\n\r\nconfig = Config('config.json')\r\nconfig.set('api_key', 'abc123')\r\napi_key = config.get('api_key')\r\n```\r\n\r\n### Report Generator\r\n\r\n```python\r\nimport s7\r\nfrom datetime import datetime\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass ReportGenerator:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.data = []\r\n \r\n def add_entry(self, entry):\r\n s7.check_expiration()\r\n self.data.append({\r\n 'timestamp': datetime.now().isoformat(),\r\n 'entry': entry\r\n })\r\n \r\n def generate(self, output_file):\r\n s7.check_expiration()\r\n with open(output_file, 'w') as f:\r\n f.write(\"REPORT\\n\")\r\n f.write(\"=\" * 50 + \"\\n\\n\")\r\n \r\n for item in self.data:\r\n s7.check_expiration()\r\n f.write(f\"{item['timestamp']}: {item['entry']}\\n\")\r\n \r\n print(f\"Report generated: {output_file}\")\r\n\r\nreport = ReportGenerator()\r\nreport.add_entry(\"System started\")\r\nreport.add_entry(\"Processing complete\")\r\nreport.generate(\"report.txt\")\r\n```\r\n\r\n### Notification System\r\n\r\n```python\r\nimport s7\r\nimport requests\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Notifier:\r\n def __init__(self, webhook_url):\r\n s7.check_expiration()\r\n self.webhook_url = webhook_url\r\n \r\n def send(self, message, level='info'):\r\n s7.check_expiration()\r\n payload = {\r\n 'text': message,\r\n 'level': level\r\n }\r\n response = requests.post(self.webhook_url, json=payload)\r\n return response.status_code == 200\r\n \r\n def info(self, message):\r\n s7.check_expiration()\r\n self.send(message, 'info')\r\n \r\n def alert(self, message):\r\n s7.check_expiration()\r\n self.send(message, 'alert')\r\n\r\nnotifier = Notifier('https://hooks.example.com/webhook')\r\nnotifier.info(\"Task completed\")\r\nnotifier.alert(\"Error detected\")\r\n```\r\n\r\n### Command Executor\r\n\r\n```python\r\nimport s7\r\nimport subprocess\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass CommandExecutor:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.history = []\r\n \r\n def run(self, command):\r\n s7.check_expiration()\r\n result = subprocess.run(\r\n command,\r\n shell=True,\r\n capture_output=True,\r\n text=True\r\n )\r\n \r\n self.history.append({\r\n 'command': command,\r\n 'returncode': result.returncode,\r\n 'stdout': result.stdout,\r\n 'stderr': result.stderr\r\n })\r\n \r\n return result\r\n \r\n def get_history(self):\r\n s7.check_expiration()\r\n return self.history\r\n\r\nexecutor = CommandExecutor()\r\nresult = executor.run('ls -la')\r\nprint(result.stdout)\r\n```\r\n\r\n### Data Validator\r\n\r\n```python\r\nimport s7\r\nimport re\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Validator:\r\n def __init__(self):\r\n s7.check_expiration()\r\n pass\r\n \r\n def email(self, value):\r\n s7.check_expiration()\r\n pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'\r\n return bool(re.match(pattern, value))\r\n \r\n def phone(self, value):\r\n s7.check_expiration()\r\n pattern = r'^\\+?1?\\d{9,15}$'\r\n return bool(re.match(pattern, value))\r\n \r\n def url(self, value):\r\n s7.check_expiration()\r\n pattern = r'^https?://[^\\s]+$'\r\n return bool(re.match(pattern, value))\r\n \r\n def validate_dict(self, data, rules):\r\n s7.check_expiration()\r\n errors = {}\r\n \r\n for field, rule in rules.items():\r\n s7.check_expiration()\r\n if field not in data:\r\n errors[field] = \"Required\"\r\n elif rule == 'email' and not self.email(data[field]):\r\n errors[field] = \"Invalid email\"\r\n \r\n return errors\r\n\r\nvalidator = Validator()\r\nprint(validator.email(\"test@example.com\"))\r\nprint(validator.phone(\"+1234567890\"))\r\nerrors = validator.validate_dict(\r\n {'email': 'invalid'},\r\n {'email': 'email', 'phone': 'phone'}\r\n)\r\n```\r\n\r\n### Batch Processor\r\n\r\n```python\r\nimport s7\r\nimport time\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass BatchProcessor:\r\n def __init__(self, batch_size=100):\r\n s7.check_expiration()\r\n self.batch_size = batch_size\r\n \r\n def process(self, items, process_func):\r\n s7.check_expiration()\r\n results = []\r\n \r\n for i in range(0, len(items), self.batch_size):\r\n s7.check_expiration()\r\n batch = items[i:i + self.batch_size]\r\n \r\n for item in batch:\r\n s7.check_expiration()\r\n result = process_func(item)\r\n results.append(result)\r\n \r\n time.sleep(0.1)\r\n \r\n return results\r\n\r\ndef process_item(item):\r\n return item * 2\r\n\r\nprocessor = BatchProcessor(batch_size=50)\r\nitems = list(range(1000))\r\nresults = processor.process(items, process_item)\r\n```\r\n\r\n### State Machine\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass StateMachine:\r\n def __init__(self, initial_state):\r\n s7.check_expiration()\r\n self.state = initial_state\r\n self.transitions = {}\r\n \r\n def add_transition(self, from_state, to_state, condition):\r\n s7.check_expiration()\r\n if from_state not in self.transitions:\r\n self.transitions[from_state] = []\r\n self.transitions[from_state].append((to_state, condition))\r\n \r\n def transition(self, event):\r\n s7.check_expiration()\r\n if self.state in self.transitions:\r\n for to_state, condition in self.transitions[self.state]:\r\n if condition(event):\r\n self.state = to_state\r\n return True\r\n return False\r\n \r\n def get_state(self):\r\n s7.check_expiration()\r\n return self.state\r\n\r\nsm = StateMachine('idle')\r\nsm.add_transition('idle', 'running', lambda e: e == 'start')\r\nsm.add_transition('running', 'idle', lambda e: e == 'stop')\r\nsm.transition('start')\r\nprint(sm.get_state())\r\n```\r\n\r\n### Task Queue\r\n\r\n```python\r\nimport s7\r\nimport queue\r\nimport threading\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass TaskQueue:\r\n def __init__(self, num_workers=4):\r\n s7.check_expiration()\r\n self.queue = queue.Queue()\r\n self.workers = []\r\n \r\n for i in range(num_workers):\r\n s7.check_expiration()\r\n worker = threading.Thread(target=self.worker)\r\n worker.daemon = True\r\n worker.start()\r\n self.workers.append(worker)\r\n \r\n def worker(self):\r\n while True:\r\n s7.check_expiration()\r\n task = self.queue.get()\r\n if task is None:\r\n break\r\n \r\n func, args = task\r\n func(*args)\r\n self.queue.task_done()\r\n \r\n def add_task(self, func, *args):\r\n s7.check_expiration()\r\n self.queue.put((func, args))\r\n \r\n def wait(self):\r\n s7.check_expiration()\r\n self.queue.join()\r\n\r\ndef process_task(task_id):\r\n print(f\"Processing task {task_id}\")\r\n\r\ntq = TaskQueue(num_workers=2)\r\nfor i in range(10):\r\n tq.add_task(process_task, i)\r\ntq.wait()\r\n```\r\n\r\n### Plugin System\r\n\r\n```python\r\nimport s7\r\nimport importlib\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass PluginManager:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.plugins = {}\r\n \r\n def load(self, plugin_name):\r\n s7.check_expiration()\r\n module = importlib.import_module(plugin_name)\r\n plugin = module.Plugin()\r\n self.plugins[plugin_name] = plugin\r\n return plugin\r\n \r\n def execute(self, plugin_name, method, *args):\r\n s7.check_expiration()\r\n if plugin_name in self.plugins:\r\n plugin = self.plugins[plugin_name]\r\n return getattr(plugin, method)(*args)\r\n \r\n def list_plugins(self):\r\n s7.check_expiration()\r\n return list(self.plugins.keys())\r\n\r\nmanager = PluginManager()\r\n```\r\n\r\n### Event System\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass EventEmitter:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.listeners = {}\r\n \r\n def on(self, event, callback):\r\n s7.check_expiration()\r\n if event not in self.listeners:\r\n self.listeners[event] = []\r\n self.listeners[event].append(callback)\r\n \r\n def emit(self, event, *args):\r\n s7.check_expiration()\r\n if event in self.listeners:\r\n for callback in self.listeners[event]:\r\n s7.check_expiration()\r\n callback(*args)\r\n \r\n def off(self, event, callback):\r\n s7.check_expiration()\r\n if event in self.listeners:\r\n self.listeners[event].remove(callback)\r\n\r\nemitter = EventEmitter()\r\n\r\ndef on_data(data):\r\n print(f\"Received: {data}\")\r\n\r\nemitter.on('data', on_data)\r\nemitter.emit('data', 'test')\r\n```\r\n\r\n---\r\n\r\n## Advanced Class Usage\r\n\r\n```python\r\nfrom s7 import ExpireDate\r\n\r\nexpire = ExpireDate()\r\nexpire.set_date(2025, 12, 31, 23, 59, 59)\r\n\r\nwhile True:\r\n if not expire.check():\r\n break\r\n print(\"Running...\")\r\n```\r\n\r\n```python\r\nfrom s7 import ExpireDate, ProtectionSystem\r\n\r\nexpire = ExpireDate()\r\nexpire.set_date(2026, 1, 1)\r\n\r\nprotection = ProtectionSystem()\r\n\r\nfor i in range(100):\r\n expire.check()\r\n print(f\"Iteration {i}\")\r\n```\r\n\r\n```python\r\nfrom s7 import TimeValidator\r\n\r\nvalidator = TimeValidator()\r\n\r\nif validator.validate_system_time():\r\n timestamp = validator.get_secure_timestamp()\r\n print(f\"Secure timestamp: {timestamp}\")\r\n```\r\n\r\n```python\r\nfrom s7.net import NetworkTimeValidator\r\n\r\nntv = NetworkTimeValidator()\r\nnetwork_time = ntv.get_network_time()\r\nlocal_time = int(time.time())\r\n\r\nif ntv.validate_local_time(local_time):\r\n print(\"Time is valid\")\r\n```\r\n\r\n```python\r\nfrom s7.mem import MemoryProtection\r\n\r\nmem = MemoryProtection()\r\nmem.protect('secret_key', 'my_secret_value')\r\n\r\nif mem.verify('secret_key'):\r\n value = mem.get('secret_key')\r\n print(f\"Protected value: {value}\")\r\n```\r\n\r\n---\r\n\r\n## Multiple Checks Pattern\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\ndef critical_operation():\r\n s7.check_expiration()\r\n \r\n result = process_data()\r\n s7.check_expiration()\r\n \r\n save_result(result)\r\n s7.check_expiration()\r\n \r\n return result\r\n```\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Application:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.data = []\r\n \r\n def add(self, item):\r\n s7.check_expiration()\r\n self.data.append(item)\r\n \r\n def process(self):\r\n s7.check_expiration()\r\n for item in self.data:\r\n s7.check_expiration()\r\n print(item)\r\n \r\n def save(self):\r\n s7.check_expiration()\r\n with open('data.txt', 'w') as f:\r\n for item in self.data:\r\n s7.check_expiration()\r\n f.write(f\"{item}\\n\")\r\n```\r\n\r\n---\r\n\r\n## Integration Patterns\r\n\r\n```python\r\nimport s7\r\nfrom functools import wraps\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\ndef protected(func):\r\n @wraps(func)\r\n def wrapper(*args, **kwargs):\r\n s7.check_expiration()\r\n return func(*args, **kwargs)\r\n return wrapper\r\n\r\n@protected\r\ndef my_function():\r\n print(\"Function executed\")\r\n\r\n@protected\r\ndef another_function(x, y):\r\n return x + y\r\n```\r\n\r\n```python\r\nimport s7\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass ProtectedClass:\r\n def __init__(self):\r\n s7.check_expiration()\r\n \r\n def __enter__(self):\r\n s7.check_expiration()\r\n return self\r\n \r\n def __exit__(self, *args):\r\n s7.check_expiration()\r\n \r\n def execute(self):\r\n s7.check_expiration()\r\n print(\"Executing\")\r\n\r\nwith ProtectedClass() as pc:\r\n pc.execute()\r\n```\r\n\r\n---\r\n\r\n## Complete Application Examples\r\n\r\n### Complete REST API\r\n\r\n```python\r\nfrom flask import Flask, request, jsonify\r\nimport s7\r\nimport sqlite3\r\n\r\napp = Flask(__name__)\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass Database:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.conn = sqlite3.connect('app.db', check_same_thread=False)\r\n self.init_db()\r\n \r\n def init_db(self):\r\n s7.check_expiration()\r\n self.conn.execute('''\r\n CREATE TABLE IF NOT EXISTS users (\r\n id INTEGER PRIMARY KEY,\r\n name TEXT,\r\n email TEXT\r\n )\r\n ''')\r\n self.conn.commit()\r\n \r\n def get_all(self):\r\n s7.check_expiration()\r\n cursor = self.conn.execute('SELECT * FROM users')\r\n return cursor.fetchall()\r\n \r\n def get_by_id(self, user_id):\r\n s7.check_expiration()\r\n cursor = self.conn.execute('SELECT * FROM users WHERE id = ?', (user_id,))\r\n return cursor.fetchone()\r\n \r\n def create(self, name, email):\r\n s7.check_expiration()\r\n self.conn.execute('INSERT INTO users (name, email) VALUES (?, ?)', (name, email))\r\n self.conn.commit()\r\n\r\ndb = Database()\r\n\r\n@app.before_request\r\ndef check_expiration_middleware():\r\n s7.check_expiration()\r\n\r\n@app.route('/api/users', methods=['GET'])\r\ndef get_users():\r\n s7.check_expiration()\r\n users = db.get_all()\r\n return jsonify({'users': users})\r\n\r\n@app.route('/api/users/<int:user_id>', methods=['GET'])\r\ndef get_user(user_id):\r\n s7.check_expiration()\r\n user = db.get_by_id(user_id)\r\n if user:\r\n return jsonify({'user': user})\r\n return jsonify({'error': 'Not found'}), 404\r\n\r\n@app.route('/api/users', methods=['POST'])\r\ndef create_user():\r\n s7.check_expiration()\r\n data = request.json\r\n db.create(data['name'], data['email'])\r\n return jsonify({'status': 'created'}), 201\r\n\r\nif __name__ == '__main__':\r\n app.run(debug=True)\r\n```\r\n\r\n### Complete CLI Application\r\n\r\n```python\r\nimport s7\r\nimport click\r\nimport json\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nclass TodoManager:\r\n def __init__(self, filename='todos.json'):\r\n s7.check_expiration()\r\n self.filename = filename\r\n self.todos = self.load()\r\n \r\n def load(self):\r\n s7.check_expiration()\r\n try:\r\n with open(self.filename, 'r') as f:\r\n return json.load(f)\r\n except FileNotFoundError:\r\n return []\r\n \r\n def save(self):\r\n s7.check_expiration()\r\n with open(self.filename, 'w') as f:\r\n json.dump(self.todos, f, indent=2)\r\n \r\n def add(self, task):\r\n s7.check_expiration()\r\n self.todos.append({'task': task, 'done': False})\r\n self.save()\r\n \r\n def list(self):\r\n s7.check_expiration()\r\n return self.todos\r\n \r\n def complete(self, index):\r\n s7.check_expiration()\r\n if 0 <= index < len(self.todos):\r\n self.todos[index]['done'] = True\r\n self.save()\r\n\r\nmanager = TodoManager()\r\n\r\n@click.group()\r\ndef cli():\r\n s7.check_expiration()\r\n pass\r\n\r\n@cli.command()\r\n@click.argument('task')\r\ndef add(task):\r\n s7.check_expiration()\r\n manager.add(task)\r\n click.echo(f'Added: {task}')\r\n\r\n@cli.command()\r\ndef list():\r\n s7.check_expiration()\r\n todos = manager.list()\r\n for i, todo in enumerate(todos):\r\n status = '\u2713' if todo['done'] else ' '\r\n click.echo(f'{i}. [{status}] {todo[\"task\"]}')\r\n\r\n@cli.command()\r\n@click.argument('index', type=int)\r\ndef done(index):\r\n s7.check_expiration()\r\n manager.complete(index)\r\n click.echo(f'Completed task {index}')\r\n\r\nif __name__ == '__main__':\r\n cli()\r\n```\r\n\r\n### Complete Service Application\r\n\r\n```python\r\nimport s7\r\nimport time\r\nimport logging\r\nfrom threading import Thread\r\n\r\ns7.set_expiration(2025, 12, 31)\r\n\r\nlogging.basicConfig(level=logging.INFO)\r\nlogger = logging.getLogger(__name__)\r\n\r\nclass Service:\r\n def __init__(self):\r\n s7.check_expiration()\r\n self.running = False\r\n self.threads = []\r\n \r\n def start(self):\r\n s7.check_expiration()\r\n logger.info(\"Starting service\")\r\n self.running = True\r\n \r\n self.threads.append(Thread(target=self.monitor_loop))\r\n self.threads.append(Thread(target=self.worker_loop))\r\n \r\n for thread in self.threads:\r\n thread.start()\r\n \r\n def stop(self):\r\n s7.check_expiration()\r\n logger.info(\"Stopping service\")\r\n self.running = False\r\n \r\n for thread in self.threads:\r\n thread.join()\r\n \r\n def monitor_loop(self):\r\n while self.running:\r\n s7.check_expiration()\r\n logger.info(\"Monitor tick\")\r\n time.sleep(10)\r\n \r\n def worker_loop(self):\r\n while self.running:\r\n s7.check_expiration()\r\n self.process_task()\r\n time.sleep(5)\r\n \r\n def process_task(self):\r\n s7.check_expiration()\r\n logger.info(\"Processing task\")\r\n\r\nif __name__ == '__main__':\r\n service = Service()\r\n try:\r\n service.start()\r\n while True:\r\n time.sleep(1)\r\n except KeyboardInterrupt:\r\n service.stop()\r\n```\r\n\r\n---\r\n\r\n## Package Information\r\n\r\n```python\r\nimport s7\r\n\r\nprint(s7.__version__)\r\nprint(s7.__author__)\r\nprint(s7.__telegram__)\r\n```\r\n\r\n```python\r\nfrom s7 import __all__\r\n\r\nprint(__all__)\r\n```\r\n\r\n---\r\n\r\n## License\r\n\r\nMIT License\r\n\r\nCopyright (c) 2025 san\r\n\r\nContact: @ii00hh on Telegram\r\n\r\n---\r\n\r\n## Build & Publish\r\n\r\n```bash\r\npip install build twine\r\npython -m build\r\ntwine check dist/*\r\ntwine upload dist/*\r\n```\r\n\r\n---\r\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Advanced time-based expiration and protection system for Python applications",
"version": "1.1.0",
"project_urls": {
"Homepage": "https://t.me/ii00hh"
},
"split_keywords": [
"expiration",
" time-based",
" security",
" protection",
" licensing"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "54372b1891dfb36f582d13d6ba6ad738ca3b0fcb1439dbc765f19f68755636b9",
"md5": "a3133283f324754c0e573efd46c1e0a8",
"sha256": "73ca53d70cc138778b32e06d19c90abee8555e6bc41527aabdf4f9c4142f88f3"
},
"downloads": -1,
"filename": "san7-1.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a3133283f324754c0e573efd46c1e0a8",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 16262,
"upload_time": "2025-11-09T10:22:22",
"upload_time_iso_8601": "2025-11-09T10:22:22.507590Z",
"url": "https://files.pythonhosted.org/packages/54/37/2b1891dfb36f582d13d6ba6ad738ca3b0fcb1439dbc765f19f68755636b9/san7-1.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-11-09 10:22:22",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "san7"
}