This tutorial will guide you through how to design and use the functionality of AsyncConfigIt is a Python configuration library that supports modern async management. It was built from scratch to include powerful features such as type-safe, dataclass-based, configuration loading and multiple configuration sources, like environment variables, files and dictionaries, along with hot reloading via watchdog. AsyncConfig’s clean API, along with its strong validation abilities, makes it ideal for development as well as production environments. In this tutorial we will demonstrate AsyncConfig’s capabilities by using examples that are simple, more advanced and validated. These use cases all use asyncio in order to provide non-blocking work flows.
import asyncio
Import json
Import os
import yaml
From pathlib import Path
Import Any, Dict TypeVar Union TypeVar Type
From dataclasses Import Dataclass, Field, Missing
Observer imported from watchdog.observers
from watchdog.events import FileSystemEventHandler
import logging
__version__ = "0.1.0"
__author__ = "AsyncConfig Team"
T = TypeVar ('T').
logger = logging.getLogger(__name__)
Start by installing the essential Python modules for your configuration system. Asyncio is used for asynchronous operation, json and yaml for parsing files, dataclasses are for structured configuration and watchdog allows for fast reloading. In addition, we set up some metadata to help track system events.
class ConfigError(Exception):
"""Base exception for configuration errors."""
pass
class ValidationError(ConfigError):
"""Raised when configuration validation fails."""
pass
class LoadError(ConfigError):
"""Raised when configuration loading fails."""
pass
@dataclass
class ConfigSource
"""Represents a configuration source with priority and reload capabilities."""
The path is optional[Path] There are no other options.
env_prefix Optional[str] There are no other options.
The data is optional[Dict[str, Any][ = None
Priority: int = 0.
Watch: bool=False
def __post_init__(self):
If you are self.path
self.path = Path(self.path)
We define a hierarchy of custom exceptions to handle different configuration-related errors, with ConfigError as the base class and more specific ones, such as ValidationError and LoadError, for targeted troubleshooting. The ConfigSource data type represents a single source of configuration, such as a file, an environment variable or dictionary. We can also support hot reloading, prioritization, and other features.
class ConfigWatcher(FileSystemEventHandler):
"""File system event handler for configuration hot reloading."""
def __init__(self, config_manager, paths: list[Path]):
self.config_manager = config_manager
self.paths = {str(p.resolve()) for p in paths}
Supermarkets are a great way to buy goods and services.().__init__()
def on_modified(self, event):
If event.is_directory or event.src_path are not present in self.paths, then:
logger.info(f"Configuration file changed: {event.src_path}")
asyncio.create_task(self.config_manager._reload_config())
We create the ConfigWatcher class by extending FileSystemEventHandler to enable hot reloading of configuration files. This class tracks specified file paths, and triggers the asynchronous loading of the configuration via the manager associated with it whenever the file changes. It allows our application to be updated in real-time, without the need for a restart.
Class AsyncConfigManager
"""
Type safety and hot loading are included in this modern configuration manager.
Features:
- Async-first design
Type-safe Configuration Classes
Support for environment variable
Hot Reloading
- Multiple source merging
Validation of the error message with details
"""
def __init__(self):
List of self-sources[ConfigSource] = []
Self-observers list[Observer] = []
Self-config_cache : Dict[str, Any] = {}
self.reload_callbacks: list[callable] = []
self._lock = asyncio.Lock()
def add_source(self, source: ConfigSource) -> "AsyncConfigManager":
"""Add a configuration source."""
self.sources.append(source)
self.sources.sort(key=lambda x: x.priority, reverse=True)
Return Self
Def add_file() (self path: Union[str, Path], priority: int = 0, watch: bool = False) -> "AsyncConfigManager":
"""Add a file-based configuration source."""
return self.add_source(ConfigSource(path=path, priority=priority, watch=watch))
def add_env(self, prefix: str, priority: int = 100) -> "AsyncConfigManager":
"""Add environment variable source."""
return self.add_source(ConfigSource(env_prefix=prefix, priority=priority))
Def add_dict (self, Dict)[str, Any], priority: int = 50) -> "AsyncConfigManager":
"""Add dictionary-based configuration source."""
return self.add_source(ConfigSource(data=data, priority=priority))
async def load_config(self, config_class: Type[T]) -> T:
"""Load and validate configuration into a typed dataclass."""
Async lock with self.
config_data = await self._merge_sources()
try:
return self._validate_and_convert(config_data, config_class)
Except as follows:
raise ValidationError(f"Failed to validate configuration: {e}")
async def _merge_sources(self) -> Dict[str, Any]:
"""Merge configuration from all sources based on priority."""
merged = {}
for source in reversed(self.sources):
try:
data = await self._load_source(source)
if data:
merged.update(data)
Except as follows:
logger.warning(f"Failed to load source {source}: {e}")
Return to merged
async def _load_source(self, source: ConfigSource) -> Optional[Dict[str, Any]]:
"""Load data from a single configuration source."""
if source.data:
return source.data.copy()
if source.path:
return await self._load_file(source.path)
if source.env_prefix:
return self._load_env_vars(source.env_prefix)
Return No
async def _load_file(self, path: Path) -> Dict[str, Any]:
"""Load configuration from a file."""
If path.exists():
raise LoadError(f"Configuration file not found: {path}")
try:
content = await asyncio.to_thread(path.read_text)
if path.suffix.lower() == '.json':
return json.loads(content)
elif path.suffix.lower() You can also find out more about the following: ['.yml', '.yaml']:
return yaml.safe_load(content) or {}
else:
raise LoadError(f"Unsupported file format: {path.suffix}")
Except as follows:
raise LoadError(f"Failed to load {path}: {e}")
def _load_env_vars(self, prefix: str) -> Dict[str, Any]:
"""Load environment variables with given prefix."""
env_vars = {}
Upper prefix is prefix.() + '_'
Value in os.environ.items for the key():
if key.startswith(prefix):
Key = config_key[len(prefix):].lower()
env_vars[config_key] = self._convert_env_value(value)
return env_vars
def _convert_env_value(self, value: str) -> Any:
"""Convert environment variable string to appropriate type."""
If value.lower() "True" or "false":
Lower return value() == 'true'
try:
If '. In value:
Return float (value)
Return int (value)
It is not ValueError.
pass
try:
return json.loads(value)
except json.JSONDecodeError:
pass
Value for money
def _validate_and_convert(self, data: Dict[str, Any]Type config_class[T]) -> T:
"""Validate and convert data to the specified configuration class."""
if not hasattr(config_class, '__dataclass_fields__'):
raise ValidationError(f"{config_class.__name__} must be a dataclass")
type_hints = get_type_hints(config_class)
field_values = {}
for field_name, field_info in config_class.__dataclass_fields__.items():
if field_name in data:
Field_value is data[field_name]
if hasattr(field_info.type, '__dataclass_fields__'):
if isinstance(field_value, dict):
field_value = self._validate_and_convert(field_value, field_info.type)
field_values[field_name] = field_value
The field_info.default attribute is present:
field_values[field_name] = field_info.default
elif field_info.default_factory is not MISSING:
field_values[field_name] = field_info.default_factory()
else:
raise ValidationError(f"Required field '{field_name}' not found in configuration")
return config_class(**field_values)
async def start_watching(self):
"""Start watching configuration files for changes."""
watch_paths = []
For source, self-sources
If you have source.path, source.watch or both:
watch_paths.append(source.path)
If watch_paths
"Observer" = "Observer()
Watcher = ConfigWatcher (self, watch_paths).
Watch_paths for path:
observer.schedule(watcher, str(path.parent), recursive=False)
observer.start()
self.observers.append(observer)
logger.info(f"Started watching {len(watch_paths)} configuration files")
Async stop_watching (self)
"""Stop watching configuration files."""
For self-observers.
observer.stop()
observer.join()
self.observers.clear()
async def _reload_config(self):
"""Reload configuration from all sources."""
try:
self.config_cache.clear()
for callback in self.reload_callbacks:
await callback()
logger.info("Configuration reloaded successfully")
Exception to the rule:
logger.error(f"Failed to reload configuration: {e}")
def on_reload(self, callback: callable):
"""Register a callback to be called when configuration is reloaded."""
self.reload_callbacks.append(callback)
async def __aenter__(self):
await self.start_watching()
Return Self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop_watching()
AsyncConfigManager will be the foundation of our new system. This class acts as a central controller, adding all configuration operations (files, variables in the environment, dictionary entries), merging these by priority, loading and validating files synchronously. The design is async first, allowing for non-blocking input/output, with a locking system to guarantee safe simultaneous access. We also enable hot-reloading, by monitoring specified configuration files and initiating callbacks when a change in the file is detected. This set-up provides a robust and flexible foundation for managing dynamically application configurations.
async def load_config(config_class: Type[T],
Configuration file: Optional[Union[str, Path]] = None,
env_prefix Optional[str] = None,
watch: bool = False) -> T:
"""
Quickly load the configuration using this convenient function.
Args:
config_class is the dataclass that will be loaded into configuration.
Configuration file: optional path
Prefix for optional environment variables env_prefix
If you should watch the file for any changes
Returns:
Configured instance (config_class)
"""
manager = AsyncConfigManager()
If config_file is:
manager.add_file(config_file, priority=0, watch=watch)
If env_prefix is:
manager.add_env(env_prefix, priority=100)
return await manager.load_config(config_class)
Load_config is a handy helper function that simplifies the process of configuring the system. We can use a single call to load the settings into a dataclass from either a file or environment variables. The library is made beginner-friendly, but still supports advanced uses.
@dataclass
Class DatabaseConfig
"""Example database configuration."""
Host: str = "localhost"
port: int = 5432
User: str = "admin"
password: str = ""
Database: str = "myapp"
ssl_enabled = False bool
Pool_size int = 10.
@dataclass
Class AppConfig
"""Example application configuration."""
debug: bool = False
Log_level : str = "INFO"
Secret_key = ""
database: DatabaseConfig = field(default_factory=DatabaseConfig)
Redis_url is str "redis://localhost:6379"
Maximum number of workers: 4
async def demo_simple_config():
"""Demo simple configuration loading."""
sample_config = {
"debug": True,
"log_level": "DEBUG",
"secret_key": "dev-secret-key",
"database": {
"host": "localhost",
"port": 5432,
"username": "testuser",
"password": "testpass",
"database": "testdb"
},
"max_workers": 8
}
manager = AsyncConfigManager()
manager.add_dict(sample_config, priority=0)
config = await manager.load_config(AppConfig)
print("=== Simple Configuration Demo ===")
print(f"Debug mode: {config.debug}")
print(f"Log level: {config.log_level}")
print(f"Database host: {config.database.host}")
print(f"Database port: {config.database.port}")
print(f"Max workers: {config.max_workers}")
Return Configuration
To demonstrate how configurations with nested types and nesting are organized, we define DatabaseConfig. To demonstrate real usage, we write demo_simple_config()We load our basic dictionary in to the config manager. The example shows us how we can easily map structured data in Python into objects that are type safe. This makes configuration handling easy, clean and maintainable.
async def demo_advanced_config():
"""Demo advanced configuration with multiple sources."""
base_config = {
"debug": False,
"log_level": "INFO",
"secret_key": "production-secret",
"max_workers": 4
}
override_config = {
"debug": True,
"log_level": "DEBUG",
"database": {
"host": "dev-db.example.com",
"port": 5433
}
}
env_config = {
"secret_key": "env-secret-key",
"redis_url": "redis://prod-redis:6379"
}
print("n=== Advanced Configuration Demo ===")
manager = AsyncConfigManager()
manager.add_dict(base_config, priority=0)
manager.add_dict(override_config, priority=50)
manager.add_dict(env_config, priority=100)
config = await manager.load_config(AppConfig)
print("Configuration sources merged:")
print(f"Debug mode: {config.debug} (from override)")
print(f"Log level: {config.log_level} (from override)")
print(f"Secret key: {config.secret_key} (from env)")
print(f"Database host: {config.database.host} (from override)")
print(f"Redis URL: {config.redis_url} (from env)")
Return Configuration
async def demo_validation():
"""Demo configuration validation."""
print("n=== Configuration Validation Demo ===")
valid_config = {
"debug": True,
"log_level": "DEBUG",
"secret_key": "test-key",
"database": {
"host": "localhost",
"port": 5432
}
}
manager = AsyncConfigManager()
manager.add_dict(valid_config, priority=0)
try:
config = await manager.load_config(AppConfig)
print("✓ Valid configuration loaded successfully")
print(f" Database SSL: {config.database.ssl_enabled} (default value)")
print(f" Database pool size: {config.database.pool_size} (default value)")
except ValidationError as e:
print(f"✗ Validation error: {e}")
incomplete_config = {
"debug": True,
"log_level": "DEBUG"
}
AsyncConfigManager = manager2()
manager2.add_dict(incomplete_config, priority=0)
try:
config2 = await manager2.load_config(AppConfig)
print("✓ Configuration with defaults loaded successfully")
print(f" Secret key: '{config2.secret_key}' (default empty string)")
except ValidationError as e:
print(f"✗ Validation error: {e}")
In two examples, we demonstrate some of the more advanced configuration features. In demo_advanced_config()We demonstrate the merging of multiple configuration sources – base, override and environment – based on priority. Sources with a higher priority are given precedence. It shows the flexibility with which environment-specific settings can be managed. In demo_validation()The system validates both the complete and partial configurations. Where possible, defaults are used to fill in any missing fields. ValidationErrors will be thrown if required fields aren’t present.
async def run_demos():
"""Run all demonstration functions."""
try:
await demo_simple_config()
await demo_advanced_config()
await demo_validation()
print("n=== All demos completed successfully! ===")
Except as follows:
print(f"Demo error: {e}")
Traceback of imports
traceback.print_exc()
await run_demos()
If the __name__ equals "__main__":
try:
loop = asyncio.get_event_loop()
if loop.is_running():
print("Running in Jupyter/IPython environment")
print("Use: await run_demos()")
else:
asyncio.run(run_demos())
Except RuntimeError
asyncio.run(run_demos())
The tutorial concludes with the run_demos()The utility executes sequentially all the demonstration functions. It covers simple loading, merging multiple sources, and validating. For both Jupyter- and Python-based environments, conditional logic is used to execute the demonstrations. It is now possible to showcase and test our configuration system, as well as integrate it into different workflows.
We conclude by successfully demonstrating how AsyncConfig is a powerful and extensible framework for managing configurations in modern Python apps. It is easy to combine multiple sources, to validate configurations with typed schemas and to respond in real time to changes to files. Whether you’re creating microservices or async tools for CLI, this library provides a flexible, developer-friendly solution to managing configuration efficiently and securely.
Take a look at the Full Codes. The researchers are the sole credit holders for this work.
Sponsorship Opportunity: You can reach the influential AI developers of US and Europe. Unlimited possibilities. 1M+ monthly subscribers, 500K+ active community builders. [Explore Sponsorship]
Asif Razzaq serves as the CEO at Marktechpost Media Inc. As an entrepreneur, Asif has a passion for harnessing Artificial Intelligence to benefit society. Marktechpost is his latest venture, a media platform that focuses on Artificial Intelligence. It is known for providing in-depth news coverage about machine learning, deep learning, and other topics. The content is technically accurate and easy to understand by an audience of all backgrounds. Over 2 million views per month are a testament to the platform’s popularity.


