Skip to content

Commit

Permalink
Merge pull request #9 from ole1986/clearpos
Browse files Browse the repository at this point in the history
Clear stop position support and allow database manipulations
  • Loading branch information
ole authored Apr 11, 2020
2 parents a985a31 + 60bb490 commit b043ae8
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 60 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
**v0.7**

- command to move up/down for a specific delay `--send DOWN|UP:<delay>` until release button is simulated (UNTESTED)
- command to clear stop positions `--send CLEARPOS`
- provided `--add <modifier>` and `--remove <code>` option to add or remove units
- support for `--channel 0` (zero) allowing to master wall mounted senders (overrides CODE_REMOTE)

**v0.6**

- added two more units ("1737e" and "1737f") counting a total of 5 units (35 possible channels)
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This project automates "Becker Antriebe" shutter also known as CC31/CC51 using the Centronic USB Stick V2

```
./centronic-stick.py [-hlst] [--checksum <code>] [--device <device>] [--send <UP|UP2|DOWN|DOWN2|HALT|TRAIN|REMOVE> --channel <[unit:]channel>]
./centronic-stick.py [-hlst] [--checksum <code>] [--device <device>] [--add <modifier>] [--mod <modifier>] [--remove <code>] [--send <UP|UP2|DOWN|DOWN2|HALT|DOWN:<delay>|UP:<delay>|CLEARPOS|TRAIN|TRAINMASTER|REMOVE> --channel <[unit:]channel>]
This script is used send command codes to CC11/CC51 compatible receivers through the CentronicControl USB Stick
It is necessary to own such USB device and to PAIR it first, before using commands like UP and DOWN
Expand All @@ -12,14 +12,16 @@ It is necessary to own such USB device and to PAIR it first, before using comman
-l: listen on the centronic USB device to fetch the codes
-s: display the current db stats (incl. last run of a unit)
-t: test mode - no codes will be send and no numbers consumed / works only with '--send'
--send <command>: submit a completely generated code for UP/UP2/DOWN/DOWN2/HALT/TRAIN/REMOVE commands / requires '--channel'
--send <command>: submit a completely generated code for UP/UP2/DOWN/DOWN2/HALT/DOWN:<delay>/UP:<DELAY>/CLEARPOS/TRAIN/REMOVE commands / requires '--channel'
While UP2 and DOWN2 are the intermediate position (E.g. sun protection)
--device <device>: set the device if it differs from the default, also host:port possible (ser2net)
--channel <[unit:]channel>: define the unit (1-5) and channel (1-7) being used for '--send'. Example: 2:15 will close shutter for unit 2 on all channels
--checksum <code>: add a checksum to the given 40 char code and output (without STX, ETX)
--mod <modifier>: used to manipulate the db entries - FOR VERY ADVANCED USERS
--mod <modifier>: used to manipulate the db entries
--add <modifier>: used to add a db entry
--remove <code>: used to remove an entry from db
Version 0.6 - Authors: ole1986, toolking
Version 0.7 - Authors: ole1986, toolking
```

### INSTALLATION
Expand Down
205 changes: 149 additions & 56 deletions centronic-stick.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import sqlite3
import socket
import serial # pyserial module required
import time
import re
import enum

# <STX> and <ETX> for every single code being send
STX = b'\x02'
Expand All @@ -21,16 +24,27 @@

COMMAND_HALT = 0x10 # stop
COMMAND_UP = 0x20 # move up
COMMAND_UP2 = 0x24 # intermediate position "up"
COMMAND_UP2 = 0x21 # move up
COMMAND_UP3 = 0x22 # move up
COMMAND_UP4 = 0x23 # move up
COMMAND_UP5 = 0x24 # intermediate position "up"
COMMAND_DOWN = 0x40 # move down
COMMAND_DOWN2 = 0x44 # intermediate position "down" (sun protection)
COMMAND_DOWN2 = 0x41 # move down
COMMAND_DOWN3 = 0x42 # move down
COMMAND_DOWN4 = 0x43 # move down
COMMAND_DOWN5 = 0x44 # intermediate position "down" (sun protection)
COMMAND_PAIR = 0x80 # pair button press
COMMAND_PAIR2 = 0x81 # pair button pressed for 3 seconds (without releasing)
COMMAND_PAIR3 = 0x82 # pair button pressed for 6 seconds (without releasing)
COMMAND_PAIR4 = 0x83 # pair button pressed for 10 seconds (without releasing)

COMMAND_CLEARPOS = 0x90
COMMAND_CLEARPOS2 = 0x91
COMMAND_CLEARPOS3 = 0x92
COMMAND_CLEARPOS4 = 0x93

def showhelp():
print('%s [-hlst] [--checksum <code>] [--device <device>] [--send <UP|UP2|DOWN|DOWN2|HALT|TRAIN|REMOVE> --channel <[unit:]channel>]' % sys.argv[0])
print('%s [-hlst] [--checksum <code>] [--device <device>] [--add <modifier>] [--mod <modifier>] [--remove <code>] [--send <UP|UP2|DOWN|DOWN2|HALT|DOWN:<delay>|UP:<delay>|CLEARPOS|TRAIN|TRAINMASTER|REMOVE> --channel <[unit:]channel>]' % sys.argv[0])
print('')
print('This script is used send command codes to CC11/CC51 compatible receivers through the CentronicControl USB Stick')
print('It is necessary to own such USB device and to PAIR it first, before using commands like UP and DOWN')
Expand All @@ -39,14 +53,16 @@ def showhelp():
print(" -l: listen on the centronic USB device to fetch the codes")
print(" -s: display the current db stats (incl. last run of a unit)")
print(" -t: test mode - no codes will be send and no numbers consumed / works only with '--send'")
print(" --send <command>: submit a completely generated code for UP/UP2/DOWN/DOWN2/HALT/TRAIN/REMOVE commands / requires '--channel'")
print(" --send <command>: submit a completely generated code for UP/UP2/DOWN/DOWN2/HALT/DOWN:<delay>/UP:<DELAY>/CLEARPOS/TRAIN/REMOVE commands / requires '--channel'")
print(" While UP2 and DOWN2 are the intermediate position (E.g. sun protection)")
print(" --device <device>: set the device if it differs from the default, also host:port possible (ser2net)")
print("--channel <[unit:]channel>: define the unit (1-5) and channel (1-7) being used for '--send'. Example: 2:15 will close shutter for unit 2 on all channels")
print(" --checksum <code>: add a checksum to the given 40 char code and output (without STX, ETX)")
print(" --mod <modifier>: used to manipulate the db entries - FOR VERY ADVANCED USERS")
print(" --mod <modifier>: used to manipulate the db entries")
print(" --add <modifier>: used to add a db entry")
print(" --remove <code>: used to remove an entry from db")
print('')
print('Version 0.6 - Authors: ole1986, toolking')
print('Version 0.7 - Authors: ole1986, toolking')

class Database:

Expand Down Expand Up @@ -137,6 +153,25 @@ def get_all_units(self):

return result

def get_rowid_from_unit(self, code, create=True):
c = self.conn.cursor()
res = c.execute('SELECT rowid FROM unit WHERE code = ?', (code,))

result = res.fetchone()
rowid = result[0] if result is not None else -1

return rowid

def add_unit(self, unit):
c = self.conn.cursor()
c.execute("INSERT INTO unit VALUES (?, ?, ?, ?)", (unit[0], int(unit[1]), int(unit[2]), 0,))
self.conn.commit()

def remove_unit(self, code):
c = self.conn.cursor()
c.execute("DELETE FROM unit WHERE code = ?", (code,))
self.conn.commit()

def set_unit(self, unit, test=False):
c = self.conn.cursor()
last_run = int(time.time())
Expand Down Expand Up @@ -187,13 +222,17 @@ def send(self, cmd, channel, test=False):
b = channel.split(':')
if len(b) > 1:
ch = int(b[1])
un = int(b[0])
if len(b[0]) == 5:
# fetch correct rowid from a given unit
un = self.db.get_rowid_from_unit(b[0])
else:
un = int(b[0])
else:
ch = int(channel)
un = 1

if not 1 <= ch <= 7 and ch != 15:
print("Channel must be in range of 1-7 or 15")
if not 0 <= ch <= 7 and ch != 15:
print("Channel must be in range of 1-7 or 15 for all and 0 for no channel")
return

if not self.device:
Expand All @@ -212,21 +251,34 @@ def send(self, cmd, channel, test=False):
self.runcodes(ch, unit, cmd, test)

def runcodes(self, channel, unit, cmd, test):
if unit[2] == 0 and cmd != "TRAIN":
if unit[2] == 0 and cmd != "TRAIN" and cmd != "TRAINMASTER":
print("The unit %s is not configured" % (unit[0]))
return

# move up/down dependent on given time
mt = re.match(r"(DOWN|UP):(\d+)", cmd)

codes = []
if cmd == "UP":
codes.append(self.generatecode(channel, unit, COMMAND_UP))
elif cmd == "UP2":
codes.append(self.generatecode(channel, unit, COMMAND_UP2))
codes.append(self.generatecode(channel, unit, COMMAND_UP5))
elif cmd == "HALT":
codes.append(self.generatecode(channel, unit, COMMAND_HALT))
elif cmd == "DOWN":
codes.append(self.generatecode(channel, unit, COMMAND_DOWN))
elif cmd == "DOWN2":
codes.append(self.generatecode(channel, unit, COMMAND_DOWN2))
codes.append(self.generatecode(channel, unit, COMMAND_DOWN5))
elif cmd == "CLEARPOS":
codes.append(self.generatecode(channel, unit, COMMAND_PAIR))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_CLEARPOS))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_CLEARPOS2))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_CLEARPOS3))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_CLEARPOS4))
elif cmd == "TRAIN":
codes.append(self.generatecode(channel, unit, COMMAND_PAIR))
unit[1] += 1
Expand All @@ -249,12 +301,33 @@ def runcodes(self, channel, unit, cmd, test):
codes.append(self.generatecode(channel, unit, COMMAND_PAIR3))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_PAIR4))
elif cmd == "TRAINMASTER":
codes.append(self.generatecode(channel, unit, COMMAND_PAIR))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_PAIR2))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_PAIR3))
unit[1] += 1
codes.append(self.generatecode(channel, unit, COMMAND_PAIR4))
# set unit as configured
unit[2] = 1

unit[1] += 1
if mt:
print("Moving %s for %s seconds..." % (mt.group(1), mt.group(2)))
# move down/up for a specific time
if mt.group(1) == "UP":
code = self.generatecode(channel, unit, COMMAND_UP)
elif mt.group(1) == "DOWN":
code = self.generatecode(channel, unit, COMMAND_DOWN)

unit[1] += 1
self.write([code],test)
time.sleep(int(mt.group(2)))
else:
unit[1] += 1

# append the release button code
codes.append(self.generatecode(channel, unit, 0))

unit[1] += 1

self.write(codes,test)
Expand All @@ -276,7 +349,11 @@ def generatecode(self, channel, unit, cmd_code, with_checksum=True):
unitId = unit[0] # contains the unit code in hex (5 chars)
unitIncrement = unit[1] # contains the next increment (required to convert into hex4)

code = CODE_PREFIX + hex4(unitIncrement) + CODE_SUFFIX + unitId + CODE_21 + CODE_REMOTE + hex2(channel) + '00' + hex2(cmd_code)
if channel == 0:
# channel 0 may be used for wall mounted sender (primary used as master sender)
code = CODE_PREFIX + hex4(unitIncrement) + CODE_SUFFIX + unitId + CODE_21 + "00" + hex2(channel) + '00' + hex2(cmd_code)
else:
code = CODE_PREFIX + hex4(unitIncrement) + CODE_SUFFIX + unitId + CODE_21 + CODE_REMOTE + hex2(channel) + '00' + hex2(cmd_code)
return checksum(code) if with_checksum else code

def hex2(n):
Expand Down Expand Up @@ -309,60 +386,76 @@ def checksum(code):


def main(argv):
class Modes(enum.IntFlag):
Listen = 1
Send = 2
Stat = 4
Test = 256

try:
test_only = False
is_listen = False
is_send = False
is_stats = False
mod = ""
code = ""
cmd = ""
# mode can be either Listen, Send or Stat
mode = 0
# the channel to be used
channel = "0"
# the given argument(s)
argument = ""
# the device to use
device = DEFAULT_DEVICE_NAME

opts, _ = getopt.getopt(
argv, "hlits", ["checksum=", "device=", "channel=", "send=", "mod="])
argv, "hlits", ["checksum=", "device=", "channel=", "send=", "mod=", "add=", "remove="])

if len(opts) < 1:
showhelp()
return

for (opt, arg) in opts:
if opt == '-h':
showhelp()
return
elif opt in ['-s']:
is_stats = True
elif opt in ('-t'):
test_only = True
elif opt in ('--device'):
device = arg
elif opt in ('-l'):
is_listen = True
elif opt in ('--mod'):
mod = arg
elif opt in ('--channel'):
channel = arg
elif opt in ('--send'):
cmd = arg
is_send = True
elif opt in ("--checksum"):
code = arg

with Database() as db:

for (opt, arg) in opts:
if opt == '-h':
showhelp()
return
elif opt in ['-s']:
mode |= Modes.Stat
elif opt in ('-t'):
mode |= Modes.Test
elif opt in ('--device'):
device = arg
elif opt in ('-l'):
mode |= Modes.Listen
elif opt in ('--mod'):
unit = arg.split(':')
db.set_unit(unit)
mode |= Modes.Stat
elif opt in ('--add'):
db.add_unit(arg.split(':'))
mode |= Modes.Stat
elif opt in ('--remove'):
db.remove_unit(arg)
mode |= Modes.Stat
elif opt in ('--checksum'):
arg = checksum(arg)
if arg:
print(arg)
elif opt in ('--send'):
argument = arg
mode |= Modes.Send
elif opt in ('--channel'):
channel = arg

stick = USBStick(db, device)
if mod:
unit = mod.split(':')
db.set_unit(unit)
if is_stats:

test = False
if Modes.Test & mode:
test = True
mode ^= Modes.Test

if mode == Modes.Stat:
db.output()
if is_listen and not is_send:
elif mode == Modes.Listen:
stick.listen()
elif is_send and not is_listen:
stick.send(cmd, channel, test_only)
elif code:
code = checksum(code)
if code:
print(code)
elif mode == Modes.Send:
stick.send(argument, channel, test)

except getopt.GetoptError:
sys.exit(2)
Expand Down

0 comments on commit b043ae8

Please sign in to comment.