mirror of
https://github.com/AUTOMATIC1111/stable-diffusion-webui.git
synced 2025-08-03 10:50:23 +00:00
rework extensions metadata: use custom sorter that doesn't mess the order as much and ignores cyclic errors, use classes with named fields instead of dictionaries, eliminate some duplicated code
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import configparser
|
||||
import functools
|
||||
import os
|
||||
import threading
|
||||
import re
|
||||
@@ -8,7 +9,6 @@ from modules import shared, errors, cache, scripts
|
||||
from modules.gitpython_hack import Repo
|
||||
from modules.paths_internal import extensions_dir, extensions_builtin_dir, script_path # noqa: F401
|
||||
|
||||
extensions = []
|
||||
|
||||
os.makedirs(extensions_dir, exist_ok=True)
|
||||
|
||||
@@ -22,13 +22,56 @@ def active():
|
||||
return [x for x in extensions if x.enabled]
|
||||
|
||||
|
||||
class ExtensionMetadata:
|
||||
filename = "metadata.ini"
|
||||
config: configparser.ConfigParser
|
||||
canonical_name: str
|
||||
requires: list
|
||||
|
||||
def __init__(self, path, canonical_name):
|
||||
self.config = configparser.ConfigParser()
|
||||
|
||||
filepath = os.path.join(path, self.filename)
|
||||
if os.path.isfile(filepath):
|
||||
try:
|
||||
self.config.read(filepath)
|
||||
except Exception:
|
||||
errors.report(f"Error reading {self.filename} for extension {canonical_name}.", exc_info=True)
|
||||
|
||||
self.canonical_name = self.config.get("Extension", "Name", fallback=canonical_name)
|
||||
self.canonical_name = canonical_name.lower().strip()
|
||||
|
||||
self.requires = self.get_script_requirements("Requires", "Extension")
|
||||
|
||||
def get_script_requirements(self, field, section, extra_section=None):
|
||||
"""reads a list of requirements from the config; field is the name of the field in the ini file,
|
||||
like Requires or Before, and section is the name of the [section] in the ini file; additionally,
|
||||
reads more requirements from [extra_section] if specified."""
|
||||
|
||||
x = self.config.get(section, field, fallback='')
|
||||
|
||||
if extra_section:
|
||||
x = x + ', ' + self.config.get(extra_section, field, fallback='')
|
||||
|
||||
return self.parse_list(x.lower())
|
||||
|
||||
def parse_list(self, text):
|
||||
"""converts a line from config ("ext1 ext2, ext3 ") into a python list (["ext1", "ext2", "ext3"])"""
|
||||
|
||||
if not text:
|
||||
return []
|
||||
|
||||
# both "," and " " are accepted as separator
|
||||
return [x for x in re.split(r"[,\s]+", text.strip()) if x]
|
||||
|
||||
|
||||
class Extension:
|
||||
lock = threading.Lock()
|
||||
cached_fields = ['remote', 'commit_date', 'branch', 'commit_hash', 'version']
|
||||
metadata: ExtensionMetadata
|
||||
|
||||
def __init__(self, name, path, enabled=True, is_builtin=False, canonical_name=None):
|
||||
def __init__(self, name, path, enabled=True, is_builtin=False, metadata=None):
|
||||
self.name = name
|
||||
self.canonical_name = canonical_name or name.lower()
|
||||
self.path = path
|
||||
self.enabled = enabled
|
||||
self.status = ''
|
||||
@@ -40,18 +83,8 @@ class Extension:
|
||||
self.branch = None
|
||||
self.remote = None
|
||||
self.have_info_from_repo = False
|
||||
|
||||
@functools.cached_property
|
||||
def metadata(self):
|
||||
if os.path.isfile(os.path.join(self.path, "metadata.ini")):
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
config.read(os.path.join(self.path, "metadata.ini"))
|
||||
return config
|
||||
except Exception:
|
||||
errors.report(f"Error reading metadata.ini for extension {self.canonical_name}.",
|
||||
exc_info=True)
|
||||
return None
|
||||
self.metadata = metadata if metadata else ExtensionMetadata(self.path, name.lower())
|
||||
self.canonical_name = metadata.canonical_name
|
||||
|
||||
def to_dict(self):
|
||||
return {x: getattr(self, x) for x in self.cached_fields}
|
||||
@@ -162,7 +195,7 @@ def list_extensions():
|
||||
elif shared.opts.disable_all_extensions == "extra":
|
||||
print("*** \"Disable all extensions\" option was set, will only load built-in extensions ***")
|
||||
|
||||
extension_dependency_map = {}
|
||||
loaded_extensions = {}
|
||||
|
||||
# scan through extensions directory and load metadata
|
||||
for dirname in [extensions_builtin_dir, extensions_dir]:
|
||||
@@ -175,55 +208,30 @@ def list_extensions():
|
||||
continue
|
||||
|
||||
canonical_name = extension_dirname
|
||||
requires = None
|
||||
|
||||
if os.path.isfile(os.path.join(path, "metadata.ini")):
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
config.read(os.path.join(path, "metadata.ini"))
|
||||
canonical_name = config.get("Extension", "Name", fallback=canonical_name)
|
||||
requires = config.get("Extension", "Requires", fallback=None)
|
||||
except Exception:
|
||||
errors.report(f"Error reading metadata.ini for extension {extension_dirname}. "
|
||||
f"Will load regardless.", exc_info=True)
|
||||
|
||||
canonical_name = canonical_name.lower().strip()
|
||||
metadata = ExtensionMetadata(path, canonical_name)
|
||||
|
||||
# check for duplicated canonical names
|
||||
if canonical_name in extension_dependency_map:
|
||||
errors.report(f"Duplicate canonical name \"{canonical_name}\" found in extensions "
|
||||
f"\"{extension_dirname}\" and \"{extension_dependency_map[canonical_name]['dirname']}\". "
|
||||
f"The current loading extension will be discarded.", exc_info=False)
|
||||
already_loaded_extension = loaded_extensions.get(metadata.canonical_name)
|
||||
if already_loaded_extension is not None:
|
||||
errors.report(f'Duplicate canonical name "{canonical_name}" found in extensions "{extension_dirname}" and "{already_loaded_extension.name}". Former will be discarded.', exc_info=False)
|
||||
continue
|
||||
|
||||
# both "," and " " are accepted as separator
|
||||
requires = list(filter(None, re.split(r"[,\s]+", requires.lower()))) if requires else []
|
||||
|
||||
extension_dependency_map[canonical_name] = {
|
||||
"dirname": extension_dirname,
|
||||
"path": path,
|
||||
"requires": requires,
|
||||
}
|
||||
is_builtin = dirname == extensions_builtin_dir
|
||||
extension = Extension(name=extension_dirname, path=path, enabled=extension_dirname not in shared.opts.disabled_extensions, is_builtin=is_builtin, metadata=metadata)
|
||||
extensions.append(extension)
|
||||
loaded_extensions[canonical_name] = extension
|
||||
|
||||
# check for requirements
|
||||
for (_, extension_data) in extension_dependency_map.items():
|
||||
dirname, path, requires = extension_data['dirname'], extension_data['path'], extension_data['requires']
|
||||
requirement_met = True
|
||||
for req in requires:
|
||||
if req not in extension_dependency_map:
|
||||
errors.report(f"Extension \"{dirname}\" requires \"{req}\" which is not installed. "
|
||||
f"The current loading extension will be discarded.", exc_info=False)
|
||||
requirement_met = False
|
||||
break
|
||||
dep_dirname = extension_dependency_map[req]['dirname']
|
||||
if dep_dirname in shared.opts.disabled_extensions:
|
||||
errors.report(f"Extension \"{dirname}\" requires \"{dep_dirname}\" which is disabled. "
|
||||
f"The current loading extension will be discarded.", exc_info=False)
|
||||
requirement_met = False
|
||||
break
|
||||
for extension in extensions:
|
||||
for req in extension.metadata.requires:
|
||||
required_extension = loaded_extensions.get(req)
|
||||
if required_extension is None:
|
||||
errors.report(f'Extension "{extension.name}" requires "{req}" which is not installed.', exc_info=False)
|
||||
continue
|
||||
|
||||
is_builtin = dirname == extensions_builtin_dir
|
||||
extension = Extension(name=dirname, path=path,
|
||||
enabled=dirname not in shared.opts.disabled_extensions and requirement_met,
|
||||
is_builtin=is_builtin)
|
||||
extensions.append(extension)
|
||||
if not extension.enabled:
|
||||
errors.report(f'Extension "{extension.name}" requires "{required_extension.name}" which is disabled.', exc_info=False)
|
||||
continue
|
||||
|
||||
|
||||
extensions: list[Extension] = []
|
||||
|
Reference in New Issue
Block a user