python - Why might a session not be current? -


i have unit test defined so. setup works correctly. config dictionary location of mysql executable , name of database.

import unittest  #mercury datainterface import sqldatasources candc import blankslate candc import configuration  #under test candc import addtask  #this temporary measure until more elegant configuration implemented configpath = '/somewhere/in/my/filesystem'  class addtasktest(unittest.testcase):   '''   test adding task   '''    def setup(self):     '''     blank out database     '''     config = configuration.config(configpath)     blankslate.wipe(config)    def test_addandretreive(self):     '''     add , retrieve task     '''      config = configuration.config(configpath)      source = sqldatasources.source(config.sqlcredentials())      #of type sqlalchemy.orm.session.session     session = source.sqlsession()      #insert test tasks     addtask.insert('tests/candc/test_task', session)     addtask.insert('tests/candc/test_task', session)     addtask.insert('tests/candc/test_task', session)     session.close()      #retreive test task     #ask more should in there     session = source.sqlsession()     tasks = source.opentasksready(4, session)      #three should return     self.assertequal(len(tasks), 3) 

the source class looks this:

from actionresources import workhandlers  __author__ = 'packet-racket'  ''' encapsulates data access mysql via sqlalchemy. maintains stateful connectiont db'''  #sqlalchemy connection sqlalchemy import create_engine sqlalchemy.orm import sessionmaker  #mercury objects mercuryalchemy import schemaobjects  #needed built-ins datetime import datetime  class source():    '''   connection imports   '''   def __init__(self, connectionparameters):     '''     make , keep connection mysql      connectionparameters -- dict     '''     #connect      # self.engine = create_engine('mysql+mysqldb://root:@localhost:3306/mercury', pool_recycle=3600)     #kludge -- pool_recycle     self.engine = create_engine(connectionparameters, pool_recycle=3600)     self.session = sessionmaker()     self.session.configure(bind=self.engine)    def add(self, schemaobject, sess):     '''     acid addition of schemaobject type      schemobject -- schemaobjet.whatever     '''     sess.add(schemaobject)     def sqlsession(self):     '''      :return: session work     '''     return self.session()    def opentasksready(self, thismany, asession):     '''     return thismany open tasks ready start      thismany -- integer     sess - session work     '''     return asession.query(schemaobjects.task).filter(       schemaobjects.task.status == 'open').filter(       schemaobjects.task.start_time <= datetime.now()).all() 

addtask looks this:

import ast  datetime import datetime  #mercury imports actionresources import workhandlers datainterface import sessionhelpers  def insert(filename, session):   '''   insert task given file name    session -- sql alchemy session   '''    lines = ''   open(filename, 'rb') athand:     lines = athand.readlines()      #commbine file 1 line, getting rid of newlines     lines = ''.join(lines)     lines = lines.split('\n')     #no blank elements     lines = [item item in lines if len(item) > 0]     lines = ''.join(lines)    inputdict = ast.literal_eval(lines)    #todo -- maybe cleaner way whatever   #needs manually changed every new workhandler -- big kludge   '''   how should done.    scane workhandlers module 1 matching name of 1 given in input.  create , insert 1 of found type.   '''   mappings = {     'createonedeleteold': workhandlers.createonedeleteold,     'donothing': workhandlers.donothing   }    #create worker   specifiedhandler = inputdict['work_handler'].lower()   if specifiedhandler in mappings:     #map class instantiator variable     handlerclass = mappings[specifiedhandler]   else:     raise exception('invalid work handler named.')    #remember this:   #datetime.datetime.strptime(s, '%y-%m-%d %h:%m:%s.%f')    if 'start_time' in inputdict:     #format: '2014-09-19 15:19:50.414395'     start_time = datetime.strptime(inputdict['start_time'], '%y-%m-%d %h:%m:%s.%f')   else:     start_time = datetime.now()    task = handlerclass(description=inputdict['description'],                       status='open',                       created=datetime.now(),                       start_time=start_time)    #insert tasks   session.add(task)   session.commit() 

sqldatasources own class interacts sql alchemy setup. it's methods take session , use retrieve or insert data. problem retrieval doesn't fetch objects. fails intermittenly. if run in debug mode , pause before line tasks = source.opentasksready(4, session), passes test. problem?


Comments

Popular posts from this blog

php - Submit Form Data without Reloading page -

linux - Rails running on virtual machine in Windows -