forked from jurev/knockout
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
156 lines (119 loc) · 4.46 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
Knockout is a set of modules that extend Python's import mechanisms in various ways.
See PEP 302(http://www.python.org/dev/peps/pep-0302/) for more info.
"""
import sys, os, re
import imp
import logging
SOURCE = 5
logging.addLevelName(SOURCE, "SOURCE")
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()
class Loader:
""" A basic module loader.
"""
def __init__(self, source, fullpath, ispkg, importer=None):
self.source = source
self.fullpath = fullpath
self.ispkg = ispkg
self.importer = importer
def load_module(self, fullname):
""" Add the new module to sys.modules,
execute its source and return it.
"""
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = self.fullpath
mod.__loader__ = self
if self.ispkg and self.importer:
mod.__path__ = [self.importer.join(self.importer.path, fullname.split(".")[-1])]
#for line in self.source.split('\n'):
# log.log(SOURCE, "%s %s" %('|>|', line))
log.debug("load_module: executing %s's source..." % fullname)
try:
exec self.source in mod.__dict__
except:
if fullname in sys.modules:
del sys.modules[fullname]
raise
mod = sys.modules[fullname]
return mod
class ImporterMeta(type):
""" A metaclass for all importers.
"""
def __init__(cls, name, bases, attrs):
if not hasattr(cls, 'importers'):
cls.importers = []
else:
cls.importers.append(cls)
class Importer:
""" The base for all other importers.
Implements the standard Python import mechanism.
As a master importer, this class has access to all available importers.
"""
__metaclass__ = ImporterMeta
re_fullpath = re.compile(r'^(?P<path>.+)$')
# -- private stuff
def __init__(self, path):
m = self.re_fullpath.match(path)
if m:
self.__dict__.update(m.groupdict())
self.debug("accepting '%s'." % path)
else:
self.debug("rejecting path item: '%s'" % path)
raise ImportError
@classmethod
def register(cls):
""" Register this class as an active importer.
"""
if cls in sys.path_hooks:
log.warning("%s: I am already registered." % cls)
return False
sys.path_hooks.append(cls)
@classmethod
def unregister(cls):
""" Unregister this class. It will no longer be able to import modules.
"""
if not cls in sys.path_hooks:
log.warning("%s: I am not registered." % cls)
return False
sys.path_hooks.delete(cls)
def find_module(self, fullname, mpath=None):
loader = None
for ispkg in [False, True]:
try:
source, fullpath = self.get_source(fullname, ispkg=ispkg)
loader = self.get_loader(source, fullpath, ispkg)
except Exception, e:
self.debug("find_module: %s: failed to get '%s'. (%s)" % ({True: 'PKG', False: 'MOD'}.get(ispkg), fullname, e))
else:
self.debug("find_module: %s: got '%s'. mpath=%s" % ({True: 'PKG', False: 'MOD'}.get(ispkg), fullname, mpath))
break
return loader
def debug(self, str):
log.debug("%s: %s" % (self.__class__.__name__, str))
# -- stuff to override
def fullpath(self, fullname, ispkg):
if ispkg:
fullpath = self.join(self.path, fullname.split(".")[-1]) + "/__init__.py"
else:
fullpath = self.join(self.path, fullname.split(".")[-1]) + ".py"
return fullpath
def join(self, *parts):
return os.path.join(*parts)
def get_source(self, fullname, ispkg):
""" Get the source for the new module to be loaded.
"""
fullpath = self.fullpath(fullname, ispkg)
return open(fullpath).read().replace("\r\n", "\n"), fullpath
def get_loader(self, source, fullpath, ispkg):
""" Get the loader instance to load the new module.
"""
return Loader(source, fullpath, ispkg, importer=self)
# register The Hook
def register():
if Importer.register() != False:
log.info("Custom importer enabled.")
log.info("This stuff is experimental, use at your own risk. Enjoy.")
def clear_cache():
sys.path_importer_cache.clear()
#register()