Skip to content

Commit

Permalink
Add support for new handshake protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
watfordjc committed Nov 20, 2020
1 parent 630c777 commit 4bf4c8f
Showing 1 changed file with 118 additions and 1 deletion.
119 changes: 118 additions & 1 deletion tplink_smartplug.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@
import socket
from struct import pack

import binascii
import socket
import requests

# pycryptodome
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util import Counter, Padding

import logging
import hashlib
import time
import json

import http.client as http_client

version = 0.4

# Check if hostname is valid
Expand Down Expand Up @@ -85,6 +101,97 @@ def decrypt(string):
result += chr(a)
return result

# New TP-Link HS110 handshake protocol
# Classes EncryptionSession and Handshake, and function encrypt2 based on:
# https://gist.github.com/chriswheeldon/3b17d974db3817613c69191c0480fe55

class EncryptionSession:
def __init__(self, local_seed, remote_seed, user_hash):
self._key = self._key_derive(local_seed, remote_seed, user_hash)
(self._iv, self._seq) = self._iv_derive(local_seed, remote_seed, user_hash)
self._sig = self._sig_derive(local_seed, remote_seed, user_hash)

def _key_derive(self, local_seed, remote_seed, user_hash):
payload = 'lsk'.encode('utf-8') + local_seed + remote_seed + user_hash
return hashlib.sha256(payload).digest()[:16]

def _iv_derive(self, local_seed, remote_seed, user_hash):
# iv is first 16 bytes of sha256, where the last 4 bytes forms the
# sequence number used in requests and is incremented on each request
payload = 'iv'.encode('utf-8') + local_seed + remote_seed + user_hash
iv = hashlib.sha256(payload).digest()[:16]
return (iv[:12], (int.from_bytes(iv[12:16], 'big') & 0x7fffffff))

def _sig_derive(self, local_seed, remote_seed, user_hash):
# used to create a hash with which to prefix each request
payload = 'ldk'.encode('utf-8') + local_seed + remote_seed + user_hash
return hashlib.sha256(payload).digest()[:28]

def iv(self):
seq = self._seq.to_bytes(4, 'big')
iv = self._iv + seq
assert(len(iv) == 16)
return iv

def encrypt(self, msg):
self._seq = self._seq + 1
if (type(msg) == str):
msg = msg.encode('utf-8')
assert(type(msg) == bytes)
cipher = AES.new(self._key, AES.MODE_CBC, self.iv())
ciphertext = cipher.encrypt(Padding.pad(msg, AES.block_size))
signature = hashlib.sha256(self._sig + self._seq.to_bytes(4, 'big') + ciphertext).digest()
return (signature + ciphertext, self._seq)

def decrypt(self, msg):
assert(type(msg) == bytes)
cipher = AES.new(self._key, AES.MODE_CBC, self.iv())
plaintext = Padding.unpad(cipher.decrypt(msg[32:]), AES.block_size)
return plaintext

class Handshake:
def __init__(self, ip):
self.ip = ip
self.local_seed = Random.get_random_bytes(16)

def user_hash(self):
# md5(md5(email) + md5(pass))
# device is not connected to tplink cloud i.e. email and pass are empty
# may need to include your email and password below if app/plug are associated with a tplink account?
# i.e. user_hash = hashlib.md5(b'<email>').digest() + hashlib.md5(b'<pass>').digest()
user_hash = hashlib.md5(b'').digest() + hashlib.md5(b'').digest()
return hashlib.md5(user_hash).digest()

def perform(self, http_session):
# step 1 - send our seed
result = http_session.post('http://{}:80/app/handshake1'.format(self.ip), data=self.local_seed)
assert(result.status_code == 200)
body = result.content
self.remote_seed = body[:16]
assert(hashlib.sha256(self.local_seed + self.user_hash()).digest() == body[16:]) # device responds with hash of seed + user hash

# step 2 - send hash of remote seed + user hash
payload = hashlib.sha256(self.remote_seed + self.user_hash()).digest()
result = s.post('http://{}:80/app/handshake2'.format(self.ip), data=payload)
assert(result.status_code == 200)

return EncryptionSession(self.local_seed, self.remote_seed, self.user_hash())

def encrypt2(session, ip, string):
handshake = Handshake(ip)
retry = 0

while retry < 9:
encryption = handshake.perform(session)
(msg, seq) = encryption.encrypt(string)
res = session.post('http://{}:80/app/request'.format(ip), params={'seq': seq}, data=msg)
if res.status_code == 200:
break
retry += 1
time.sleep(0.25)
assert(res.status_code == 200)
return(encryption.decrypt(res.content).decode("utf-8"))


# Parse commandline arguments
parser = argparse.ArgumentParser(description=f"TP-Link Wi-Fi Smart Plug Client v{version}")
Expand Down Expand Up @@ -132,4 +239,14 @@ def decrypt(string):
print("Received: ", decrypted)

except socket.error:
quit(f"Could not connect to host {ip}:{port}")
try:
session = requests.Session()
port = 80
response = encrypt2(session, ip, cmd)
if args.quiet:
print(response)
else:
print("Sent: ", cmd)
print("Received: ", response)
except socket.error:
quit(f"Could not connect to host {ip}:{port}")

0 comments on commit 4bf4c8f

Please sign in to comment.