Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a server that can simulate a real powerwall #63

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,5 @@ dmypy.json

# MacOS finder stuff
.DS_Store

.server
105 changes: 105 additions & 0 deletions examples/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import aiofiles
from aiohttp import web
import json
import ssl
import os
import sys
from OpenSSL import crypto

SAMPLE_FILES = "samples/running"
CERT_FILE = "selfsigned.crt"
KEY_FILE = "private.key"

routes = web.RouteTableDef()


@routes.post("/api/login/Basic")
async def login(request):
await request.post()

response = web.json_response(
{
"firstname": "firstname",
"lastname": "lastname",
"token": "token",
"roles": ["Home_Owner"],
"loginTime": "loginTime",
}
)

response.set_cookie("AuthCookie", "blah")

return response


@routes.get("/api/{api:.*}")
async def api(request):
if "AuthCookie" not in request.cookies:
raise web.HTTPUnauthorized()

filename = f"{request.match_info['api'].replace('/', '.')}.json"
path = os.path.join(SAMPLE_FILES, filename)

async with aiofiles.open(path, mode="r") as f:
body = await f.read()

return web.json_response(json.loads(body))


async def make_app():
app = web.Application()
app.add_routes(routes)
return app


def create_self_signed_cert():
emailAddress = "emailAddress"
commonName = "commonName"
countryName = "NT"
localityName = "localityName"
stateOrProvinceName = "stateOrProvinceName"
organizationName = "organizationName"
organizationUnitName = "organizationUnitName"
serialNumber = 0
validityStartInSeconds = 0
validityEndInSeconds = 10 * 365 * 24 * 60 * 60
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 4096)
# create a self-signed cert
cert = crypto.X509()
cert.get_subject().C = countryName
cert.get_subject().ST = stateOrProvinceName
cert.get_subject().L = localityName
cert.get_subject().O = organizationName
cert.get_subject().OU = organizationUnitName
cert.get_subject().CN = commonName
cert.get_subject().emailAddress = emailAddress
cert.set_serial_number(serialNumber)
cert.gmtime_adj_notBefore(validityStartInSeconds)
cert.gmtime_adj_notAfter(validityEndInSeconds)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, "sha512")
with open(CERT_FILE, "wt") as f:
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8"))
with open(KEY_FILE, "wt") as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode("utf-8"))


if not os.path.exists(CERT_FILE) or not os.path.exists(KEY_FILE):
create_self_signed_cert()

ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(CERT_FILE, KEY_FILE)

if not os.path.exists(SAMPLE_FILES):
print(f"Missing sample files in {SAMPLE_FILES}, create them:")
print(
"curl -o getsamples.sh https://raw.githubusercontent.com/vloschiavo/powerwall2/master/scripts/getsamples.sh"
)
print("chmod u+x getsamples.sh")
print("./getsamples.sh youraddress youremail yourpass .server/samples")
sys.exit(-1)

web.run_app(make_app(), ssl_context=ssl_context)
6 changes: 6 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ commands = python -m unittest discover tests/integration
[testenv:example]
passenv = POWERWALL_IP,POWERWALL_PASSWORD
commands = python examples/example.py

[testenv:server]
changedir = .server
commands =
pip install pyOpenSSL aiohttp_session aiofiles
bubonicbob marked this conversation as resolved.
Show resolved Hide resolved
python {tox_root}/examples/server.py