All the codes
All the code for this tutorial
This page contains all the code u should get when you finish the tutorial. This code is also in two files under repo, called "sample_simulation_side.py" and "sample_program_side.py".
Simulation side
from algotron import reset
reset()
from algotron.core import User, Simulation, Action
from algotron.hugnman import Hugnman
def _test(port, addr='localhost'):
print '----Setup----'
# basic
Action(name='sit',
formula='x+hp*0.2',
physics='pressure',
hp='weight')
Action(name='open', formula='1', physics='opened')
Action(name='close', formula='0', physics='opened')
# simulatin setups
User.login('aaron')
myhome = Simulation('myhome', caching=True)
# init objects
chair1 = myhome.add_object('chair1')
chair1.set_state('pressure', 20)
chair2 = myhome.add_object('chair2')
chair2.set_state('pressure', 18)
door1 = myhome.add_object('door1')
door1.set_state('opened', '0')
# init sensors
sensor1 = myhome.add_sensor('c1_p_sensor', 'chair1', rate=0.8, states=['pressure'])
sensor2 = myhome.add_sensor('c2_p_sensor', 'chair2', rate=0.5, states=['pressure'])
sensor3 = myhome.add_sensor('door1_sensor', 'door1', rate=0.9, states=['opened'])
print '___Setup END___'
jackman = Hugnman('hugn jackman', myhome)
jackman.set('weight', 70)
from algotron.watchdog import Sergeant
sergeant = Sergeant(client_addr=(addr, port),
sim=myhome,
#ignores=['physics']
).ready()
print '---- chair1 test ----'
print 'Pressure before sit: {}'.format(chair1.get_state('pressure'))
print 'Sensor1 pressure: {}'.format(sensor1.get_state('pressure'))
jackman.do('sit chair1')
print 'Pressure after sit: {}'.format(chair1.get_state('pressure'))
print 'Sensor1 pressure: {}'.format(sensor1.get_state('pressure'))
print '---- chair2 test ----'
jackman.do('sit chair2')
print '---- door1 test ----'
print 'door1: {}'.format(sensor3.get_state('opened'))
jackman.do('open door1')
print 'door1: {}'.format(sensor3.get_state('opened'))
jackman.do('close door1')
print 'door1: {}'.format(sensor3.get_state('opened'))
sergeant.detach()
# provides ur program listening socket
# for details check for "communication_sample program"
_test(port=33333)
Program side
from time import sleep
from random import random
def tick(data_manager):
"""
This is an example for ur program tick, receive each new changes in Simulation and deal with it.
In this example it only echo the result.
"""
sleep(random()*2) # simulate time costs for ur program, 0-2 secs
print '---- TICK ----'
print 'current change: {}'.format(data_manager.get_change())
print 'chair1: {}'.format(data_manager.get_object(name='chair1'))
print 'chair2: {}'.format(data_manager.get_object(name='chair2'))
print 'c1 pressure sensor: {}'.format(data_manager.get_sensor_state(obj_name='chair1', state='pressure'))
print 'c2 pressure sensor: {}'.format(data_manager.get_sensor_state(obj_name='chair2', state='pressure'))
print 'door1 sensor: {}'.format(data_manager.get_sensor_state(obj_name='door1', state='opened'))
#raise Exception('test error') # uncomment this to see how it handles multi-processing error
from algotron.listener import Listener
listener = Listener(loc='localhost', port=33333)
listener.start(tick)
Updated over 8 years ago