Arquivos mdb (MS Access) sem ODBC

Trabalhar com arquivos MDB do Microsoft Access no Windows pode ser um tanto fácil via ODBC, mas o mesmo não é verdade em outros sistemas operacionais. Garantir a portabilidade do projeto ao se trabalhar com esse tipo de tecnologia é um desafio que vamos explorar nesse artigo.

A maioria do conteúdo que se encontra pela web tenta trabalhar com arquivos .mdb criando uma ponte JDBC-ODBC e embora você possa encontrar bons drivers ODBC, o único gratuito que encontrei para Linux foi o libmdbodbc (via repositório apt: apt-get install libmdbodbc mdbtools), porém o mesmo não suporta result set, impedindo o seu uso com o Talend/Java. Apesar disso, em conjunto com o pacote mdbtools essa é uma boa ferramenta para explorar arquivos mdb, permitindo executar queries, listar tabelas… tudo fora do Talend, claro.

Há também a possibilidade de obter drivers puramente JDBC, como o desenvolvido pela HXTT, porém esta não é uma solução gratuita. Você pode fazer o download de um trial por 30 dias, mas o result set das queries tem um limite de 1000 registros.

Assim sendo, utilizaremos um método não JDBC, tampouco ODBC, através da biblioteca Jackcess. Esta biblioteca contém uma série de funcionalidades que nos permite trabalhar com este tipo de tecnologia. Demonstrarei agora como utilizar algumas destas funcionalidades e ler o conteúdo de tabelas em arquivos access.

Primeiramente você deve ir até o site do projeto Jackcsess e realizar o download da biblioteca. Feito isso vamos carregar esta biblioteca em nosso job. Para isso utilizamos o componente tLibraryLoad, conforme na imagem abaixo:

Demonstração do componente tLibraryLoad

Observe que para carregar uma biblioteca que não está na listagem do componente, basca clicar no botão à direita da combo box e apontar para o arquivo com extensão .jar desejado. Além disso, observe também que temos três componentes tLibraryLoad neste job, isto se deve ao fato da biblioteca Jackcess possuir duas dependências: as bibliotecas commons-logging.jar e commons-lang-2.6.jar, ambas já existentes na listagem de bibliotecas do Talend.

Agora que já carregamos as bibliotecas necessárias, partiremos para a leitura dos dados do arquivo. Suponha que desejamos carregar estes dados em um Mysql. Utilizaremos então dois componentes: tJavaFlex e tMysqlOutput.

O componente tJavaFlex possui uma funcionalidade mesclada dos componentes tJava e tJavaRow. Através dele você pode trabalhar o código em três partes diferentes: Begin, Main e End. Mas antes que possamos compreender isto vamos definir o schema da tabela que pretendemos ler. Se você está em um ambiente Windows pode simplesmente abrir o Microsoft Access para obter os metadados da tabela, mas caso esteja em um ambiente Linux e tenha instalada a ferramenta mdbtools, pode executar o comando:

mdb-schema arquivo.mdb

Este comando irá listar os metadados de todas as tabelas existentes neste arquivo, e então você poderá utilizar o resultado deste comando para definir o schema do componente tJavaFlex.

Feito isso, vamos ao código que irá ler uma das tabelas e mapear o conteúdo obtido com as colunas definidas em nosso schema. Primeiramente, vamos importar as classes utilizadas, nós já carregamos as bibliotecas com os componentes tLibraryLoad, mas temos que dizer ao Java quais classes especificamente pretendemos utilizar destas bibliotecas. Para isso vá até o componente tJavaFlex e na aba Advanced:

Classes a serem importadas

Tendo importado as classes acima, vá até a aba Basic do componente tJavaFlex:

Esta aba será dividida basicamente em três partes: Start code, Main code e End code. Todo o código escrito dentro de Start code é executado uma única vez em um fluxo de dados: no início. Já o código escrito em Main code será executado linha a linha, permitindo que realizemos transformações ou meramente propaguemos dados obtidos. E por fim o código escrito em End code será executado uma única vez ao finalizar o fluxo de dados.

Esta abordagem nos permite definir no Start code o cabeçalho de um loop enquanto no Main code teremos basicamente o “corpo” do loop, onde estamos propagando o conteúdo das colunas obtidas para o fluxo “row3” que é aquele “levando” dados para o componente tMysqlOutput. Observe como no campo End code meramente fechamos o corpo do loop for com uma chave.

Além disso, no campo Start code criamos um objeto do tipo Table que recebe o resultado do método getTable, que por sua vez foi executado em um objeto do tipo Database obtido através do método Database.open(). Este objeto do tipo Table é o que nos permite executar um loop e obter os campos desejados de cada registro da tabela especificada, conforme você pode notar na declaração do loop for e no campo Main code.

Esta foi uma breve solução, você pode obter muito mais com a biblioteca Jackcess, existe uma boa documentação da API aqui.

Até a próxima.

Anúncios

Otimizando extração e carga de dados utilizando componentes Bulk

Neste artigo vou demonstrar como otimizar a extração e carga de dados quando utilizando componentes Bulk. Para a maioria das tecnologias de bancos de dados suportadas pelo Talend existem os componentes BulkOutput, BulkExec e OutputBulkExec, estes componentes permitem a extração para arquivo (OutputBulk), carga do arquivo utilizando o método de load do próprio banco (BulkExec) e extração e carga utilizando o mesmo componente (OutputBulkExec).

O job de demonstração abaixo foi desenvolvido para a extração de dados de um banco de dados Oracle e carga em um Sybase IQ, mas alguns tweaks foram necessários para otimizar da performance do mesmo, pois apenas esta tabela possuia um volume na casa dos terabytes.

Para começar, primeiramente estabelecemos as conexões com os databases. Como este job executa mais de um subjob em paralelo (veremos adiante como ativar multithreads) temos de garantir que as conexões sejam estabelecidas antes da execução de qualquer outro subjob, para isso utilizamos o componente tPrejob, e por segurança caso a última execução tenha terminado sem sucesso deletamos os arquivos extraídos utilizando os componentes tFileList e tFileDelete. Além do tPrejob, existe o tPostjob, que garante que os subjobs ligados a ele através de triggers sejam executados somente após a execução dos demais subjobs não ligados a ele.

Job completo: clique para visualizar em tamanho real

Um subjob é um segmento do job que pode ser executado independentemente da execução de outros segmentos, ou pode estar conectado a outros através de triggers. Graficamente eles são delimitados por retângulos de fundo azul claro, laranja ou outra cor caso você tenha customizado.

Estabelecidas as conexões e removidos os arquivos de execuções anteriores, damos início à extração e carga de nossos dados.

O subjob logo abaixo do Prejob executa uma query no nosso banco destino, neste caso um Sybase IQ, obtendo o maior ID da tabela que pretendemos carregar, desta forma podemos extrair de nossa fonte apenas os registros novos, cujo ID seja maior que o carregado na última execução. O resultado desta query é propagado ao componente tJavaRow, que armazena o valor em uma variável de contexto.

Trabalhar com o tJavaRow é simples: clique em Sync Columns e então em Generate code, e o Talend irá gerar um código geralmente no padrão

output_row.MAX = input_row.MAX;

onde MAX é o nome de um dos campos do schema do componente (neste caso há apenas um campo), output_row é a saída do tJavaRow e input_row é a entrada, o que ele está recebendo do componente predecessor. Mas como não pretendemos propagar dados a partir deste componente, substituímos output_row por context.NOME_DA_VARIAVEL ficando desta forma:

context.NOME_DA_VARIAVEL = input_row.MAX;

Para criar variáveis de contexto vá até a view Contexts, adicione suas variáveis, defina seus tipos e valores default, você pode ver mais detalhes neste artigo: Usando variáveis de contexto.

Extraído este valor máximo e armazenado em uma variável, partimos para o subjob de extração dos dados da fonte e para garantir que esta etapa seja executada somente após a execução da etapa anterior, ligamos os dois subjobs com a trigger OnSubjobOk.

Este banco de dados que utilizamos como origem é um Oracle RAC (Real Application Cluster), um cluster de databases Oracle para garantir a alta disponibilidade. Como o Talend ainda não suporta a conexão a RAC especificando mais de um host (especificando um apenas funciona, mas não garante a alta disponibilidade) utilizamos uma conexão do tipo General JDBC, onde podemos editar a URL de conexão livremente (O suporte a conexão com RACs Oracle utilizando os componentes desta tecnologia está previsto para a versão 5.0).

Então editamos a query no database de origem, para trazer apenas os dados onde o ID seja maior do que o presente no database destino. Para isso selecione o componente de input e vá em Query, lá edite a query para que ela fique semelhante a esta:

"SELECT
...
...
FROM ORIGEM
WHERE ID > " + context.VARIAVEL

Observe como adicionamos a variável de contexto à query: como no fim das contas isso é uma String para a linguagem Java, colocamos a parte constante entre aspas e então concatenamos o conteúdo da variável, que será definido em tempo de execução, utilizando o operador +.

Dica de desempenho: em alguns componentes de Input de algumas tecnologias, existem opções que permitem otimizar a forma como os dados são extraídos. Nos componentes Oracle e JDBC (dentre outros) existe a opção Enable Cursor que em alguns casos permite obter um ganho de performance considerável. Já no componente de input do MySQL existe a opção Enable stream que também pode incrementar sua performance na extração de dados. Mas atenção: com a opção de stream no Mysql não é possível utilizar uma única conexão em modo multithread com a opção Use an existing connection.

E como modo de armazenamento intermediário destes dados obtidos do database de origem utilizamos arquivo(s) de texto puro. Neste caso utilizaríamos normalmente um componente OutputBulk, mas ao invés disso utilizamos um componente tFileOutputDelimited, que no fim das contas tem o mesmo propósito: gerar um arquivo de texto puro, mas além disso possui uma opção interessante: na guia Advanced do componente ativamos a opção Split output in several files que fará surgir uma outra opção chamada Rows in each output file, com estas opções especificamos que o conjunto de registros obtidos da fonte deverá ser dividido em vários arquivos com um número máximo de registros, cujo valor é definido por você. Neste caso defini um limite de 100 milhões de registros por arquivo. Assim teremos vários arquivos com o nome PRODUCTS1.dat, PRODUCTS2.dat…


O propósito de se dividir o resultset em vários arquivos você entenderá agora com a descrição do outro subjob, que é executado em paralelo ao que foi descrito até aqui.

Este subjob executa enquanto o outro subjob não tiver concluído sua execução. Para isso utilizamos o componente tLoop, onde podemos definir o tipo de Loop que pretendemos executar (for ou while) e as condições de sua execução. O loop “for” é recomendado quando você consegue estimar o número de execuções e o loop “while”, como o próprio nome já diz, é recomendado quando você deseja executar ENQUANTO uma condição for verdadeira. Utilizaremos o loop while e mais à frente irei mostrar como construíremos a condição de execução.

Após o tLoop temos o componente tSleep que permite pausarmos a execução deste subjob por um tempo pré determinado, neste caso um minuto. Observe que a ligação entre o tLoop e tSleep é feita através da linha Iterate, pois não há fluxo de dados sendo transmitido, apenas uma repetição de execuções.

Quando o componente tSleep termina a sua execução é chamado o componente tFileList através da trigger OnComponentOk, ou seja, quando a execução apenas deste componente (tSleep) tiver sido concluída, e então o tFileList irá listar o conteúdo de um diretório especificado por você, sendo que é possível adicionar uma máscara para os arquivos listados para que você filtre o resultado.

Estamos listando o conteúdo do diretório onde o componente tFileOutputDelimited está criando arquivos, então definimos uma máscara com o formato de nome que esperamos para estes arquivos, neste caso “PRODUCTS*.dat”, onde * é um caractere curinga onde esperamos a sequència de números que o tFileOutputDelimited irá inserir na criação dos arquivos.


O resultado dos arquivos listados pode ser obtido através de um loop mais uma vez utilizando a linha Iterate, desta forma conectamos o componente tFileList e tSystem.

Através do componente tSystem executamos comandos do sistema, independente do sistema onde o job está sendo executado, Windows, Linux, FreeBSD… quem sabe outros.. E neste especificamente executamos um comando pertencente a sistemas Unix e similares: o lsof, que lista os arquivos abertos no momento por algum processo.

Observe o comando sendo executado:

"sudo /usr/sbin/lsof " + ((String)globalMap.get("tFileList_1_CURRENT_FILEPATH"))

Chamamos o comando lsof e concatenamos à String o caminho do arquivo atual listado pelo componente tFileList, mas como obter este caminho?

Observe que no canto esquerdo inferior da suite, abaixo do repositório, existe uma view chamada Outline. Dentro desta view existem variáveis de retorno dos componentes que você está utilizando no job atualmente aberto, como número de linhas inseridas em um arquivo ou erro retornado por um componente. Para utilizar estas variáveis você simplesmente seleciona e arrasta para o local desejado, um campo de código livre como no tJava ou tJavaRow ou mesmo um campo de opção de algum componente, como iremos fazer agora.

Para obter os nomes dos arquivos que listamos no componente tFileList, incluindo o caminho de sua localização, e concatenar esta String ao comando lsof, vamos até a view Outline, expandimos a visão do componente que desejamos, neste caso o tFileList_1 e arrastamos a variável Current File Name with path para o campo onde executamos o comando do sistema no componente tSystem, concatenando as Strings com o operador +.

Atenção: Tome o cuidado de observar o nome único do componente na view Outline, neste job temos por exemplo os componentes tFileList_1 e tFileList_2. Você pode obter o nome único do componente clicando no mesmo e olhando o cabeçalho da view Component.

Agora que já listamos os arquivos criados naquele diretório e executamos o comando lsof sobre a lista obtida, partimos para a carga em si destes arquivos no database destino e então você entenderá o por que de tudo isso.

Como estamos gerando vários arquivos separados em 100 milhões de registros e em paralelo temos um subjob responsável pela carga destes arquivos no Sybase IQ, temos de garantir que a carga dos arquivos extraídos somente seja executada após a conclusão destes, ou seja, após o seu fechamento. Para isso executamos o comando lsof e, dependendo do resultado, saberemos se o arquivo está fechado, significando que o mesmo está pronto para carga, ou aberto, significando que a extração de registros para este arquivo ainda não foi concluída.

Então utilizamos o componente BulkExec da tecnologia destino, aqui um Sybase IQ, para carregar o arquivo do loop atual. Neste componente você define o schema da tabela destino e aponta para o arquivo a ser carregado, mais uma vez obtendo o nome do arquivo e caminho listado no tFileList através da view Outline.

E para garantir que sejam carregados apenas os arquivos fechados, conectamos o componente tSystem ao tSybaseIQBulkExec utilizando a trigger If, onde poderemos especificar uma condição para execução do componente sucessor. Como condição utilizamos o retorno do componente tSystem, que caso seja igual a 1 indica que o arquivo atual não foi listado, ou seja, não está aberto por nenhum processo e então podemos carregá-lo.

Por fim, chamamos os componentes tSybaseCommit e tFileDelete logo após a carga do arquivo, que como os próprios nomes descrevem, realizam o Commit da transação e exclusão do arquivo carregado (mais uma vez utilizamos a variável Current file with path do componente tFileList).

Agora os detalhes adicionais para conclusão deste job. Como condição de execução do componente tLoop, queremos que este subjob seja executado enquanto houver algum arquivo a carregar, então irei lhe mostrar como construir esta condição:

Procure pelo nome único do componente tFileOutputDelimited dentro de Outline (neste caso provavelmente haverá apenas tFileOutputDelimited1) e arraste a variável Number of line para o campo Condition do componente tLoop e compare o seu valor com null, ficando desta maneira:

(((Integer)globalMap.get("tFileOutputDelimited_1_NB_LINE")) == null)

Desta forma garantimos que o subjob continue sua execução mesmo se o seu início se der antes do subjob de extração dos dados, pois o número de linhas retornado pelo componente tFileOutputDelimited será igual a null. Mas isto não é suficiente: devemos agora testar se o componente tFileList retornou algum arquivo e para isto arrastamos a variável Number of files da view outline e separamos as condições pelo operador || , que significa OU. Ainda, damos uma margem caso o componente tFileList ainda não tenha sido executado, comparando o conteúdo da variável Number of files também com null e aglomeramos estes dois testes entre parenteses para que seja retornato true (verdadeiro) caso um ou o outro retorne true, mesmo que o outro teste retorne false.

(((Integer)globalMap.get("tFileList_1_NB_FILE")) >= 1 || ((Integer)globalMap.get("tFileList_1_NB_FILE")) == null)

Juntos, os três testes com compoem a condição do loop ficarão assim:

(((Integer)globalMap.get("tFileOutputDelimited_1_NB_LINE")) == null) || (((Integer)globalMap.get("tFileList_1_NB_FILE")) >= 1 || ((Integer)globalMap.get("tFileList_1_NB_FILE")) == null)

Para concluir, utilizamos o componente tPostJob para conectar os componentes tSybaseClose e tJDBCClose, que como os nomes já descrevem, fecham as conexões estabelecidas logo no início da execução do job.

Um último detalhe antes de pressionar F6 é habilitar a execução multithread. Vá até a view Job, entre em Extra e habilite a opção Multithread Execution.

Conclusão

Para muitas cargas quando se tratam de alguns megabytes não há a necessidade de um job como este, provavelmente você utilizará um “input” e um “outputbulkexec“, mas conforme o volume de dados aumenta, a complexidade dos jobs responsáveis por sua carga se eleva proporcionalmente se você quiser incrementar a performance.

Com esta abordagem o espaço em disco não precisa ser uma preocupação: imagine extrair alguns terabytes de dados em um único arquivo para só então carregar este arquivo, enquanto você pode consumir apenas alguns gigabytes para isso!

Além do espaço em disco, há um ganho de tempo considerável, pois o database destino não precisa ficar horas (ou dias!) esperando a extração ser concluída para só então realizar a carga, enquanto pode fazê-la em partes.

Por fim, a carga de dados a cada 100 milhões de registros torna o processo mais simples de se recuperar de qualquer execução falha: imagine se a 99% da extração dos dados da fonte a rede vai abaixo? Você tem a segurança de que ao menos algo em torno de 95% já tenha sido carregado no destino com sucesso e é esse o motivo de excluírmos quaisquer arquivos existentes no início do processo: pode haver algum registro truncado e então extraímos apenas os registros cujo ID seja maior do que o último carregado no destino.