Fix and improve links migration script
This commit is contained in:
parent
7d993637d8
commit
ccf2b6529c
|
@ -7,15 +7,14 @@ import { startOfHour } from "date-fns";
|
||||||
let count = 0;
|
let count = 0;
|
||||||
const queue = new PQueue({ concurrency: 5 });
|
const queue = new PQueue({ concurrency: 5 });
|
||||||
|
|
||||||
queue.on("active", () =>
|
queue.on("active", () => (count % 1000 === 0 ? console.log(count++) : count++));
|
||||||
Math.random() > 0.7 ? console.log(count++) : count++
|
|
||||||
);
|
|
||||||
|
|
||||||
// 1. Connect to Neo4j database
|
// 1. Connect to Neo4j database
|
||||||
const neo4j = NEO4J.driver(
|
const neo4j = NEO4J.driver(
|
||||||
process.env.NEO4J_DB_URI,
|
process.env.NEO4J_DB_URI,
|
||||||
NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
|
NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
|
||||||
);
|
);
|
||||||
|
|
||||||
// 2. Connect to Postgres database
|
// 2. Connect to Postgres database
|
||||||
const postgres = knex({
|
const postgres = knex({
|
||||||
client: "postgres",
|
client: "postgres",
|
||||||
|
@ -55,7 +54,7 @@ const postgres = knex({
|
||||||
"OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
|
"OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
|
||||||
"WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
|
"WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
|
||||||
"RETURN l, u.email as email, d.name as domain, stats",
|
"RETURN l, u.email as email, d.name as domain, stats",
|
||||||
{ limit: (index + 1) * limit, skip: index * limit }
|
{ limit: limit, skip: index * limit }
|
||||||
)
|
)
|
||||||
.subscribe({
|
.subscribe({
|
||||||
onNext(record) {
|
onNext(record) {
|
||||||
|
@ -134,27 +133,12 @@ const postgres = knex({
|
||||||
...(link.createdAt && { created_at: link.createdAt })
|
...(link.createdAt && { created_at: link.createdAt })
|
||||||
};
|
};
|
||||||
|
|
||||||
const exists = await postgres<Link>("links")
|
const res = await postgres<Link>("links").insert(data, "id");
|
||||||
.where({ address: link.id })
|
const link_id = res[0];
|
||||||
.first();
|
|
||||||
|
|
||||||
let link_id: number;
|
|
||||||
if (exists) {
|
|
||||||
const res = await postgres<Link>("links")
|
|
||||||
.where("id", exists.id)
|
|
||||||
.update(data, "id");
|
|
||||||
link_id = res[0];
|
|
||||||
} else {
|
|
||||||
const res = await postgres<Link>("links").insert(
|
|
||||||
data,
|
|
||||||
"id"
|
|
||||||
);
|
|
||||||
link_id = res[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
// 7. [Postgres] Create visits
|
// 7. [Postgres] Create visits
|
||||||
for await (const [date, details] of Object.entries(visits)) {
|
const newVisits = Object.entries(visits).map(
|
||||||
const data = {
|
([date, details]) => ({
|
||||||
link_id,
|
link_id,
|
||||||
created_at: date,
|
created_at: date,
|
||||||
countries: details.countries as Record<string, number>,
|
countries: details.countries as Record<string, number>,
|
||||||
|
@ -173,20 +157,10 @@ const postgres = knex({
|
||||||
os_macos: details.os_macos as number,
|
os_macos: details.os_macos as number,
|
||||||
os_other: details.os_other as number,
|
os_other: details.os_other as number,
|
||||||
os_windows: details.os_windows as number
|
os_windows: details.os_windows as number
|
||||||
};
|
})
|
||||||
|
);
|
||||||
|
|
||||||
const visitExists = await postgres<Visit>("visits")
|
await postgres<Visit>("visits").insert(newVisits);
|
||||||
.where({ link_id, created_at: data.created_at })
|
|
||||||
.first();
|
|
||||||
|
|
||||||
if (visitExists) {
|
|
||||||
await postgres<Visit>("visits")
|
|
||||||
.where("id", visitExists.id)
|
|
||||||
.update(data);
|
|
||||||
} else {
|
|
||||||
await postgres<Visit>("visits").insert(data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
onCompleted() {
|
onCompleted() {
|
||||||
|
@ -205,7 +179,6 @@ const postgres = knex({
|
||||||
resolve();
|
resolve();
|
||||||
},
|
},
|
||||||
onError(error) {
|
onError(error) {
|
||||||
console.log(error);
|
|
||||||
session.close();
|
session.close();
|
||||||
if ((index + 1) * limit < total) {
|
if ((index + 1) * limit < total) {
|
||||||
queue.add(() => main(index + 1));
|
queue.add(() => main(index + 1));
|
||||||
|
|
Loading…
Reference in New Issue