documentation & versioning

v2.0
Steve Nyemba 9 months ago
parent 2d976cd607
commit d80b00106b

@ -12,9 +12,8 @@ We wrote this frame to be used in both command line or as a library within in yo
| -------- | --- |
|X12 claims/remits| parsing of {x12} claims/remittances into JSON format with human readible attributes|
|Multi Processing| capable of processing multiple files simultaneously to speed up processing|
|Analytics support| descriptive statistical analytics : distribution, various counts|
|Process Recovery| capable of recovering interrupted runs|
|Export to RDBMS| exports data to relational format (NoSQL -> SQL) 7 supported databases <br> * PostgreSQL,<br> * Redshift, <br>* Neteeza, <br>* Mysql, <br>* Mariadb, <br>* bigquery, <br>* sqlite3 |
|Export to RDBMS| exports data to relational format (NoSQL -> SQL) 8 supported industry standard databases <br> * PostgreSQL,<br> * Redshift, <br>* Neteeza, <br>* Mysql, <br>* Mariadb, <br>* bigquery, <br>* sqlite3<br>*databricks |
|**Issues and Bug reports**| info@the-phi.com
@ -31,96 +30,26 @@ For advanced features visit [Healthcare/IO::Parser](https://healthcareio.the-phi
**Installation command**
pip install --upgrade git+https://hiplab.mc.vanderbilt.edu/git/lab/parse-edi.git
pip install --upgrade git+https://hiplab.mc.vanderbilt.edu/git/hiplab/parser.git
## Usage
Healthcare/IO is primarily intended to be used as a command line parser (for now). It is fully written in python 3+
Healthcare/IO is primarily intended to be used as a command line parser (for now). However it can be used as a library that you integrate into custom code. It is fully written in python 3+ under MIT License
**CLI :**
**Learning More**
1. signup to get parsing configuration
The parser is driven by a configuration file that specifies fields to parse and how to parse them. You need by signing up, to get a copy of the configuration file.
#
# Use sqlite as data-store
healthcare-io.py --signup <email> [--store <mongo|sqlite>]
2. check version
Occasionally the attributes in the configuration file may change, This function will determine if there is a new version available.
healthcare-io.py --check-update
3. parsing data stored in a folder
The parser will recursively traverse a directory with claims and or remittances
healthcare-io.py --parse --folder <path> [--batch <n>] [--resume]
with :
--parse tells the engine what to parse claims or remits
--folder location of the claims|remits
--batch number of processes to spawn to parse the files
--resume tells the parser to resume parsing
if all files weren't processed or new files were added into the folder
4. export data to a relational data-store
The parser will export data into other data-stores as a relational tables allowing users to construct views to support a variety of studies.
healthcare-io.py --export <835|837> --export-config <path-export.json>
with:
--export-config configuration to support data-store
**example**
1. Exporting to PostgreSQL
{"provider":"postgresql","database":"healthcareio","schema":"foo"}
**NOTE**
The output generates a set of tables that are the result of transforming unstructured data to relational structure. The tables can be bound with the attribute **_id**
The configuration file needed to implement export is modelled after the following template:
{
"provider":"<postgresql|redshift|mysql|mariadb>",
"database":"<name>",["host":"server-name"],["port":5432],
["user":"me"],["password":"!@#z4qm"],["schema":"target-schema"]
}
**parameters:**
provider postgresql,redshift,mysql or mariadb (supported providers)
database name of the database
**optional:**
schema name of the target schema. If not provided we will assume the default
host host of the database. If not provided assuming localhost
port port value of the database if not provided the default will be used
user database user name. If not provided we assume security settings to trust
password password of database user. If not set we assume security settings to trust
- More can be found at https://healthcareio.the-phi.com/
- The source code & example code are at https://hiplab.mc.vanderbilt.edu/git/hiplab/parser
**Known Limitations**
1. By default it does NOT come with all {X12} Segments.
2. Does not support an easy way to rename attributes it parses
2. Renaming attributes requires writing plugins
3. Upgrade configuration may require dropping tables
4. For now can only read {x12} from disk
4. For now can only read {x12} from disk (or s3 bucket)
There is support for additional features and attributes available at [Healthcare/IO::Parser](https://healthcareio.the-phi.com/parser).
**In development**
1. Wizard/UI to enable attribute renaming
2. Dashboard for quick overview
3. Reading {x12} from s3 and other cloud buckets
4. Docker Image
## Credits

@ -10,11 +10,11 @@ RUN ["apt-get","-y","install","apt-utils"]
RUN ["apt","update","--fix-missing"]
RUN ["apt-get","upgrade","-y"]
RUN ["apt-get","install","-y","mongo","sqlite3","sqlite3-pcre","libsqlite3-dev","python3-dev","python3","python3-pip","git","python3-virtualenv","wget"]
RUN ["apt-get","install","-y","mongo","sqlite3","sqlite3-pcre","libsqlite3-dev","python3-dev","python3","python3-pip","git","wget"]
#
#
RUN ["pip3","install","--upgrade","pip"]
# RUN ["pip3","install","git+https://healthcare.the-phi.com/git/code/parser.git","botocore"]
RUN ["pip3","install","healthcareio@git+https://healthcare.the-phi.com/git/code/parser.git"]
USER health-user
#
# This volume is where the data will be loaded from (otherwise it is assumed the user will have it in the container somehow)
@ -24,6 +24,7 @@ VOLUME ["/data"]
# This is the port from which some degree of monitoring can/will happen
EXPOSE 80
# wget https://healthcareio.the-phi.com/git/code/parser.git/bootup.sh
RUN ["wget","https://github.com/sosedoff/pgweb/releases/download/v0.14.2/pgweb_linux_amd64.zip"]
COPY bootup.sh bootup.sh
ENTRYPOINT ["bash","-C"]
CMD ["bootup.sh"]

@ -24,3 +24,4 @@ import healthcareio.params as params
from healthcareio import logger
# from healthcareio import server
import meta

@ -0,0 +1,3 @@
{
"provider":"postgresql","database":"healthcareio","schema":"public","context":"write"
}

@ -0,0 +1,397 @@
#!/usr/bin/env python3
"""
(c) 2019 Claims Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center
Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
with :
--scope <claims|remits>
--config path of the x12 to be parsed i.e it could be 835, or 837
--folder location of the files (they must be decompressed)
--store data store could be disk, mongodb, couchdb
--db|path name of the folder to store the output or the database name
Embedded in Code :
import edi.parser
import json
file = '/data/claim_1.x12'
conf = json.loads(open('config/837.json').read())
edi.parser.get_content(filename,conf)
"""
from healthcareio.params import SYS_ARGS
from transport import factory
import requests
from healthcareio import analytics
from healthcareio import server
from healthcareio.parser import get_content
import os
import json
import sys
import numpy as np
from multiprocessing import Process
import time
from healthcareio import x12
from healthcareio.export import export
import smart
import transport
from healthcareio.server import proxy
import pandas as pd
PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
INFO = None
URL = "https://healthcareio.the-phi.com"
if not os.path.exists(PATH) :
os.mkdir(PATH)
import platform
import sqlite3 as lite
# PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
CONFIG_FILE = os.sep.join([PATH,'config.json']) if 'config' not in SYS_ARGS else SYS_ARGS['config']
HELP_MESSAGE = """
cli:
#
# Signup, allows parsing configuration to be downloaded
#
# Support for SQLite3
healthcare-io.py --signup steve@the-phi.com --store sqlite
#or support for mongodb
healthcare-io.py --signup steve@the-phi.com --store mongo
healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
healthcare-io.py --check-update
healthcare-io.py --export <835|837> --config <config-path>
action :
--signup|init signup user and get configuration file
--parse starts parsing
--check-update checks for updates
--export export data of a 835 or 837 into another database
parameters :
--<[signup|init]> signup or get a configuration file from a parsing server
--folder location of the files (the program will recursively traverse it)
--store data store mongo or sqlite or mongodb
--resume will attempt to resume if there was an interruption
"""
def signup (**args) :
"""
:email user's email address
:url url of the provider to signup
"""
email = args['email']
url = args['url'] if 'url' in args else URL
folders = [PATH,OUTPUT_FOLDER]
for path in folders :
if not os.path.exists(path) :
os.mkdir(path)
#
#
store = args['store'] if 'store' in args else 'sqlite'
headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
http = requests.session()
r = http.post(url,headers=headers)
#
# store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
# if 'store' in args :
# store = args['store']
# filename = (os.sep.join([PATH,'config.json']))
filename = CONFIG_FILE
info = r.json() #{"parser":r.json(),"store":store}
info = dict({"owner":email},**info)
info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
info['out-folder'] = OUTPUT_FOLDER
file = open( filename,'w')
file.write( json.dumps(info))
file.close()
_m = """
Thank you for signingup!!
Your configuration file is store in :path,
- More information visit https://healthcareio.the-phi.com/parser
- Access the source https://healthcareio.the-phi.com/git/code/parser
""".replace(":path",CONFIG_FILE)
print (_m)
#
# Create the sqlite3 database to
def log(**args):
"""
This function will perform a log of anything provided to it
"""
pass
def init():
"""
read all the configuration from disk.
Requirements for configuration file :
{out-folder,store,837,835 }
"""
# filename = os.sep.join([PATH,'config.json'])
filename = CONFIG_FILE
info = None
if os.path.exists(filename):
#
# Loading the configuration file (JSON format)
file = open(filename)
info = json.loads(file.read())
if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) :
os.mkdir(OUTPUT_FOLDER)
elif 'output-folder' in info and not os.path.exists(info['out-folder']) :
os.mkdir(info['out-folder'])
# if 'type' in info['store'] :
lwriter = None
is_sqlite = False
if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
lwriter = transport.factory.instance(**info['store'])
is_sqlite = True
elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
lwriter = transport.instance(**info['store']) ;
is_sqlite = True
if lwriter and is_sqlite:
for key in info['schema'] :
if key != 'logs' :
_id = 'claims' if key == '837' else 'remits'
else:
_id = key
if not lwriter.has(table=_id) :
lwriter.apply(info['schema'][key]['create'])
# [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
lwriter.close()
return info
def upgrade(**args):
"""
:email provide us with who you are
:key upgrade key provided by the server for a given email
"""
url = args['url'] if 'url' in args else URL+"/upgrade"
headers = {"key":args['key'],"email":args["email"],"url":url}
def check(**_args):
"""
This function will check if there is an update available (versions are in the configuration file)
:param url
"""
url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
url = url + "/version"
if 'version' not in _args :
version = {"_id":"version","current":0.0}
else:
version = _args['version']
http = requests.session()
r = http.get(url)
return r.json()
if __name__ == '__main__' :
info = init()
if 'out-folder' in SYS_ARGS :
OUTPUT_FOLDER = SYS_ARGS['out-folder']
SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
if set(list(SYS_ARGS.keys())) & set(['signup','init']):
#
# This command will essentially get a new copy of the configurations
# @TODO: Tie the request to a version ?
#
email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
signup(email=email,url=url,store=store,db=db)
# else:
# m = """
# usage:
# healthcareio --signup --email myemail@provider.com [--url <host>]
# """
# print (m)
elif 'upgrade' in SYS_ARGS :
#
# perform an upgrade i.e some code or new parsers information will be provided
#
pass
elif 'parse' in SYS_ARGS and info:
"""
In this section of the code we are expecting the user to provide :
:folder location of the files to process or file to process
:
"""
files = []
if 'file' in SYS_ARGS :
files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
for root,_dir,f in os.walk(SYS_ARGS['folder']) :
if f :
files += [os.sep.join([root,name]) for name in f]
# names = os.listdir(SYS_ARGS['folder'])
# files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
else:
#
# raise an error
pass
#
# if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
#
if 'resume' in SYS_ARGS :
store_config = json.loads( (open(CONFIG_FILE)).read() )
files = proxy.get.resume(files,store_config )
# print (["Found ",len(files)," files unprocessed"])
#
# @TODO: Log this here so we know what is being processed or not
SCOPE = None
if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
files = np.array_split(files,BATCH_COUNT)
procs = []
index = 0
for row in files :
row = row.tolist()
# logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,))
# parser = x12.Parser(os.sep.join([PATH,'config.json']))
parser = x12.Parser(CONFIG_FILE)
parser.set.files(row)
parser.start()
procs.append(parser)
# index = index + 1
while len(procs) > 0 :
procs = [proc for proc in procs if proc.is_alive()]
time.sleep(2)
uri = OUTPUT_FOLDER
store_config = json.loads( (open(CONFIG_FILE)).read() )['store']
if 'type' in store_config :
uri = store_config['args']['host'] if 'host' in store_config['args'] else ( store_config['args']['path'] if 'path' in store_config['args'] else store_config['args']['database'])
if 'SQLite' in store_config['type']:
provider = 'sqlite'
elif 'sql' in store_config['type'] :
provider = 'SQL'
else:
provider = 'mongo'
else:
provider = store_config['provider']
_msg = """
Completed Parsing, The data is available in :provider database at :uri
Logs are equally available for errors and summary statistics to be compiled
""".replace(":provider",provider).replace(":uri",uri)
print (_msg)
pass
elif 'analytics' in SYS_ARGS :
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
#
#
# PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
if os.path.exists(CONFIG_FILE) :
e = analytics.engine(CONFIG_FILE) #--@TODO: make the configuration file globally accessible
e.apply(type='claims',serialize=True)
SYS_ARGS['engine'] = e
SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
else:
SYS_ARGS['config'] = {"owner":None,"store":None}
if 'args' not in SYS_ARGS['config'] :
SYS_ARGS['config']["args"] = {"batch":1,"resume":True}
#
# folder is mandatory
# SYS_ARGS['config']['args']['folder'] = SYS_ARGS['folder']
# pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
# pthread = Process(target=pointer,args=())
# pthread.start()
elif 'check-update' in SYS_ARGS :
_args = {"url":SYS_ARGS['url']}
try:
if os.path.exists(CONFIG_FILE) :
SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
else:
SYS_ARGS['config'] = {}
if 'version' in SYS_ARGS['config'] :
_args['version'] = SYS_ARGS['config']['version']
version = check(**_args)
_version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
if _version['current'] != version['current'] :
print ()
print ("You need to upgrade your system to version to ",version['current'])
print ("\t- signup (for new configuration)")
print ("\t- use pip to upgrade the codebase")
else:
print ()
print ("You are running the current configuraiton version ",_version['current'])
except Exception as e:
print (e)
pass
elif 'export' in SYS_ARGS:
#
# this function is designed to export the data to csv
#
path = SYS_ARGS['export-config']
X12_TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
if not os.path.exists(path) or X12_TYPE not in ['835','837']:
print (HELP_MESSAGE)
else:
#
# Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ...
#
# _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())}
_store = json.loads( (open(path) ).read())
pipes = export.Factory.instance(type=X12_TYPE,write_store=_store,config = CONFIG_FILE) #"inspect":0,"cast":0}})
# pipes[0].run()
# print (pipes)
for thread in pipes:
if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table :
continue
thread.start()
time.sleep(1)
thread.join()
else:
print(HELP_MESSAGE)

@ -0,0 +1,38 @@
from datetime import datetime
import transport
import copy
import json
import pandas as pd
class X12Logger :
def __init__(self,**_args) :
self._store = copy.deepcopy(_args['store'])
self._store['table'] = 'logs'
self._store['context'] = 'write'
def log(self,**_args):
_date = datetime.now()
_info = {'date':'-'.join([str(_date.month),str(_date.day),str(_date.year)])}
for key in ['module','action','data'] :
value = 'NA' if key not in _args else _args[key]
value = value if type(value) not in [dict,list] else json.dumps(value)
_info[key] = value
# print ([key, type(value) in [dict,list], type(value)])
#
# Storing the whole thing
try:
_xwriter = transport.factory.instance(**self._store)
# if not _xwriter.has(table=self._store['table']) :
# table = self._store['table']
# sql = f'''CREATE TABLE {table} (date date,module char(255), data JSON)'''
# print (sql)
# _xwriter.apply(sql)
_df = pd.DataFrame([_info])
_xwriter.write(_df)
if hasattr(_xwriter,'close') :
_xwriter.close()
except Exception as e:
print ([e])
pass

@ -0,0 +1,144 @@
import numpy as np
import os
"""
This file contains utilities that will be used accross the x12 framework/platform
@TODO:
- Provisions with multiprocessing (locks/releases)
"""
class ContentHandler :
"""
This class implements {x12} content handling
"""
def split (self,_stream) :
if type(_stream) == str :
_xchar = '~\n' if '~\n' in _stream else ('~' if '~' in _stream else ('\n' if '\n' in _stream else None))
if _xchar :
_xchar = ''.join(_xchar)
_rows = _stream.split(_xchar)
return [row.strip().split('*') for row in _rows if row.strip()]
else:
return _stream.split('*')
def classify(self,_content):
"""
This function is designed to split claim information from the rest of the information (envelope header)
:_content The file content (already split by row and seperator)
"""
_indexes = [1 if 'HL' in line else 0 for line in _content]
_indexes = [_index for _index,_value in enumerate(_indexes) if _value == 1]
#
# At this point we know how many claims are in the file (log this somewhere)
#
_beg = 0
_end = _indexes[0]
_header = _content[_beg:_end]
_block = []
for _index,_beg in enumerate(_indexes) :
if _index + 1 == len(_indexes) :
_end = len(_content)
else:
_end = _indexes[_index + 1]
_block.append(_content[_beg:_end])
return {'header':_header,'block':_block}
def merge (self,_x,_y):
"""
This function will merge two objects _x, _y
"""
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in _y.keys() :
if not _key in _zcols :
_out[_key] = _y[_key]
else:
if type(_out[_key]) == list :
_out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
else:
_out[_key] = _y[_key]
return _out
else:
return dict(_x,**_y)
def _inspect_row(self,**_args):
"""
This function makes sure the indexes actually exist in the row
:row row to be parsed (already split)
:indexes list of indexes
:columns columns to be used in the creation of the object
"""
_max = np.max(_args['indexes'])
_len = np.size(_args['row']) -1
return _max > _len and np.size(_args['indexes']) == np.size(_args['columns'])
def _parse (self,**_args):
"""
This function will parse an x12 element given
:row row of the x12 element
:_columns attributes of the object to be returned
:_indexes indexes of interest
"""
pass
_row = _args['row']
_meta = _args['meta']
_columns = _args['columns']
_indexes = np.array(_args['indexes'])
if not self._inspect_row (_args) :
#
# Minimizing parsing errors by padding the line
_delta = 1+ np.max(_indexes) - np.size(_row)
_row = _row + np.repeat('',_delta).tolist()
#
# @TODO: Log that the rows were padded
#
_row = np.array(_row)
return dict(zip(_columns,_row[_indexes].tolist()))
def _buildObject (self,**_args):
"""
:meta data that is pulled from the decorator function
:object row parsed and stored as an object
:document existing document being parsed
"""
_meta = _args['meta']
_document = _args['document']
_object = _args['object']
if 'field' not in _meta and 'container' not in _meta :
_document = self.merge(_document,_object)
elif 'field' :
field = _meta['field']
if field in _document :
_document[field] = self.merge(_document[field],_object)
else:
_document[field] = _object
elif 'container' in _meta :
_label = _meta['container']
if _label not in _document :
_document[_label] = []
_document[_label].append(_object)
return _document
def get_files(self,**_args):
folder = _args['folder']
files = []
if not os.path.exists(folder) :
return []
elif os.path.isdir(folder):
for root,_dir,f in os.walk(folder) :
if f :
files += [os.sep.join([root,name]) for name in f]
files = [path for path in files if os.path.isfile(path)]
else:
files = [folder]
return files
Loading…
Cancel
Save