package com.ziclix.python.sql.pipe;

import com.ziclix.python.sql.util.Queue;
import com.ziclix.python.sql.zxJDBC;
import org.python.core.Py;
import org.python.core.PyObject;

/* loaded from: input_file:lib/jython-1.2007.jar:com/ziclix/python/sql/pipe/Pipe.class */
public class Pipe {
    public PyObject pipe(Source source, Sink sink) {
        Queue queue = new Queue();
        SourceRunner sourceRunner = new SourceRunner(queue, source);
        SinkRunner sinkRunner = new SinkRunner(queue, sink);
        sourceRunner.start();
        sinkRunner.start();
        try {
            sourceRunner.join();
            try {
                sinkRunner.join();
                if (sourceRunner.threwException()) {
                    throw zxJDBC.newError(sourceRunner.getException().toString());
                }
                if (sinkRunner.threwException()) {
                    throw zxJDBC.newError(sinkRunner.getException().toString());
                }
                if (sinkRunner.getCount() == 0) {
                    return Py.newInteger(0);
                }
                if (sourceRunner.getCount() - sinkRunner.getCount() != 0) {
                    Py.m598assert(Py.Zero, Py.newString(zxJDBC.getString("inconsistentRowCount", new Integer[]{new Integer(sourceRunner.getCount()), new Integer(sinkRunner.getCount())})));
                }
                return Py.newInteger(sinkRunner.getCount());
            } catch (InterruptedException e) {
                queue.close();
                throw zxJDBC.newError(e);
            }
        } catch (InterruptedException e2) {
            queue.close();
            throw zxJDBC.newError(e2);
        }
    }
}
