diff --git a/hshetl/cli.py b/hshetl/cli.py index 402963c6f0337ee69d205d8a5f7bbf3c88ae21e0..3732f6446f003af466de56ce2cd71320b11a82e3 100644 --- a/hshetl/cli.py +++ b/hshetl/cli.py @@ -19,7 +19,7 @@ import logging from connectors import connector_repository from entities import entity_repository from jobs import JobList -from exc import ConfigurationException, NotMatchingArgumentsException, NotMatchingYAMLTagException, UnknownNameReferenceException +from exc import ConfigurationException, NotMatchingArgumentsException, NotMatchingYAMLTagException, UnknownNameReferenceException, DuplicatedProcessException import hshetl @@ -156,16 +156,23 @@ class Controller(object): if self.config_file[:1] == '/': return self.config_file else: return self.working_directory + '/' + self.config_file + def abs_pid_file(self): + '''Resolves the name of the pid file related to this config.''' + return self.abs_config_file + '.pid' + def configure(self): '''Reads the yaml configuration from file and sets internal variables.''' try: + self.check_pid_file() self.jobs.extend(self._configuration_parser.parse(self.abs_config_file)) except IOError as ioe: sys.exit('Can\'t read configuration file: {}'.format(str(ioe))) except yaml.parser.ParserError as pe: sys.exit('Error while parsing YAML configuration: {}'.format(str(pe))) except ConfigurationException as ce: - sys.exit('Error in Configuration: {}'.format(str(ce))) + sys.exit('Error in configuration: {}'.format(str(ce))) + except DuplicatedProcessException as dpe: + sys.exit('Error on execution: {}'.format(str(dpe))) return self def execute(self): @@ -184,10 +191,25 @@ class Controller(object): except KeyboardInterrupt: logging.critical('The execution was manually interrupted.') finally: - connector_repository.shutdown() - entity_repository.shutdown() + self.shutdown() logging.info('The whole execution took {} seconds'.format(str(time.time() - self.start_time))) + def check_pid_file(self): + '''Creates a file holding the current process id, to check whether we currently run or not.''' + pidfile = self.abs_pid_file() + if os.path.isfile(pidfile): + with open(pidfile, 'r') as f: + raise DuplicatedProcessException('A process with pid %s currently works on the same configuration file (%s).' % (f.read(), self.abs_config_file)) + else: + with open(pidfile, 'w') as f: + f.write(str(os.getpid())) + + def shutdown(self): + '''Shutdown the repositories and remove the pid file.''' + connector_repository.shutdown() + entity_repository.shutdown() + os.unlink(self.abs_pid_file()) + def make_interactive(self, jobs): '''Applies interactive mode to all jobs.''' for job in jobs: diff --git a/hshetl/exc.py b/hshetl/exc.py index 099f83a8b8caa7e16952fb3d856acfe0a781a1e4..b70efa23cc965c86817d7b8db5947c6f2de4801d 100644 --- a/hshetl/exc.py +++ b/hshetl/exc.py @@ -78,3 +78,7 @@ class DataTypeConversionException(Exception): '''Raised if a value can not be converted to wanted type.''' pass + +class DuplicatedProcessException(Exception): + '''Raised if another process is working on the given config currently''' + pass \ No newline at end of file