All the codes

All the code for this tutorial

This page contains all the code u should get when you finish the tutorial. This code is also in two files under repo, called "sample_simulation_side.py" and "sample_program_side.py".

Simulation side

from algotron import reset
reset()

from algotron.core import User, Simulation, Action
from algotron.hugnman import Hugnman

def _test(port, addr='localhost'):
    print '----Setup----'
    # basic
    Action(name='sit',
           formula='x+hp*0.2',
           physics='pressure',
           hp='weight')
    Action(name='open', formula='1', physics='opened')
    Action(name='close', formula='0', physics='opened')
    # simulatin setups
    User.login('aaron')
    myhome = Simulation('myhome', caching=True)
    # init objects
    chair1 = myhome.add_object('chair1')
    chair1.set_state('pressure', 20)
    chair2 = myhome.add_object('chair2')
    chair2.set_state('pressure', 18)
    door1 = myhome.add_object('door1')
    door1.set_state('opened', '0')
    # init sensors
    sensor1 = myhome.add_sensor('c1_p_sensor', 'chair1', rate=0.8, states=['pressure'])
    sensor2 = myhome.add_sensor('c2_p_sensor', 'chair2', rate=0.5, states=['pressure'])
    sensor3 = myhome.add_sensor('door1_sensor', 'door1', rate=0.9, states=['opened'])
    print '___Setup END___'

    jackman = Hugnman('hugn jackman', myhome)
    jackman.set('weight', 70)
    from algotron.watchdog import Sergeant
    sergeant = Sergeant(client_addr=(addr, port),
                        sim=myhome,
                        #ignores=['physics']
                        ).ready()
    print '---- chair1 test ----'
    print 'Pressure before sit: {}'.format(chair1.get_state('pressure'))
    print 'Sensor1 pressure: {}'.format(sensor1.get_state('pressure'))
    jackman.do('sit chair1')
    print 'Pressure after sit: {}'.format(chair1.get_state('pressure'))
    print 'Sensor1 pressure: {}'.format(sensor1.get_state('pressure'))
    print '---- chair2 test ----'
    jackman.do('sit chair2')
    print '---- door1 test ----'
    print 'door1: {}'.format(sensor3.get_state('opened'))
    jackman.do('open door1')
    print 'door1: {}'.format(sensor3.get_state('opened'))
    jackman.do('close door1')
    print 'door1: {}'.format(sensor3.get_state('opened'))
    sergeant.detach()

# provides ur program listening socket
# for details check for "communication_sample program"
_test(port=33333)

Program side

from time import sleep
from random import random

def tick(data_manager):
    """
    This is an example for ur program tick, receive each new changes in Simulation and deal with it.

    In this example it only echo the result.
    """
    sleep(random()*2) # simulate time costs for ur program, 0-2 secs
    print '---- TICK ----'
    print 'current change: {}'.format(data_manager.get_change())
    print 'chair1: {}'.format(data_manager.get_object(name='chair1'))
    print 'chair2: {}'.format(data_manager.get_object(name='chair2'))
    print 'c1 pressure sensor: {}'.format(data_manager.get_sensor_state(obj_name='chair1', state='pressure'))
    print 'c2 pressure sensor: {}'.format(data_manager.get_sensor_state(obj_name='chair2', state='pressure'))
    print 'door1 sensor: {}'.format(data_manager.get_sensor_state(obj_name='door1', state='opened'))
    #raise Exception('test error') # uncomment this to see how it handles multi-processing error

from algotron.listener import Listener
listener = Listener(loc='localhost', port=33333)
listener.start(tick)