Friday, June 1, 2012

Synchronize table in PL/SQL

I've come accros a common problem of synchronizing data accross two databases. Here's a simple example of one-way synchronization. Please note that this is a simplified problem:
  • This solution might have difficulties with hierarchical tables (parent-child structure).
  • You may come accross foreign key constraint issues.

Two ids are in case the primary key is a composite one.

 
-------- get column list ---------
  function get_column_list(
    pp_table_name varchar2,
    p_id varchar2 default 'id',
    p_id2 varchar2 default 'id'
  ) return varchar2
  is
  column_row USER_TAB_COLUMNS%ROWTYPE;
  v_list varchar2(6000);
  v_column_count number;
  CURSOR column_cur (p_table_name IN VARCHAR2) 
      RETURN USER_TAB_COLUMNS%ROWTYPE
      IS
      SELECT * FROM USER_TAB_COLUMNS
      WHERE  table_name = p_table_name;
  begin
  v_list := '';
  select count(*) into v_column_count from USER_TAB_COLUMNS where table_name = pp_table_name;
  OPEN column_cur(pp_table_name);
  LOOP
    FETCH column_cur INTO column_row;
    v_column_count:= v_column_count - 1;
    EXIT WHEN column_cur%NOTFOUND;
    if (column_row.column_name <> p_id and column_row.column_name <> p_id2) then
      if (v_column_count = 0) then
        v_list:= v_list || column_row.column_name;
      else
        v_list:= v_list || column_row.column_name || ', ';
      end if;
    end if;
  END LOOP;
  CLOSE column_cur;
  return v_list;
  end;

------ helper function for sql building -----
  function sql_param
  (
    p_sql varchar2,
    table_name varchar2,
    remote_dblink varchar2,
    p_id varchar2,
    p_id2 varchar2
  )return varchar2
  is
  v_q varchar(5000);
  begin
  v_q:= replace(p_sql, '{table_name}', table_name);
  v_q:= replace(v_q, '{remote_dblink}', remote_dblink);
  v_q:= replace(v_q, '{id}', p_id);
  v_q:= replace(v_q, '{id2}', p_id2);
  return v_q;
  end;
------ table synchronization ---

procedure synchronize_table(
    pp_table_name varchar2,
    remote_dblink varchar2,
    p_id varchar2 default 'id',
    p_id2 varchar2 default 'id'
  )
  is
  v_sql varchar2(6000);
  begin
  -- delete
  v_sql := sql_param('delete from {table_name} where ({id}, {id2}) not in (select {id}, {id2} from {table_name}@{remote_dblink})', pp_table_name, remote_dblink, p_id, p_id2);
  execute immediate v_sql;
  
  -- update
  if (matprep2bmw_migration.get_column_list(pp_table_name, p_id, p_id2) <> '') then
    v_sql := sql_param('update {table_name} set ('||matprep2bmw_migration.get_column_list(pp_table_name, p_id, p_id2)||') = (select '||matprep2bmw_migration.get_column_list(pp_table_name, p_id, p_id2)||' from {table_name}@{remote_dblink} where {id} = {table_name}.{id} and {id2} = {table_name}.{id2})', pp_table_name, remote_dblink, p_id, p_id2);
    custom_output(v_sql);
    execute immediate v_sql;  
  end if;
  
  
  -- insert
  v_sql:= sql_param('insert into {table_name}('||matprep2bmw_migration.get_column_list(pp_table_name, ' ', ' ')||') select '||matprep2bmw_migration.get_column_list(pp_table_name, ' ', ' ')||' from {table_name}@{remote_dblink} where ({id}, {id2}) not in (select {id}, {id2} from {table_name})',
  pp_table_name, remote_dblink, p_id, p_id2);
  custom_output(v_sql);
  execute immediate v_sql;
  end;